热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
// file: src/handler.js
// Node.js 18.x (CommonJS). HTTP API: POST /api/register
// 目标:请求体验证、结构化日志(中间件式)、统一错误映射(400/409/500)、CORS、速率限制示例、可配置超时与重试策略、SMTP 占位发送验证码
'use strict';
const crypto = require('node:crypto');
// ----------------------- 可调参数(通过环境变量覆盖) -----------------------
const CORS_ORIGIN = process.env.CORS_ORIGIN || '*';
const RATE_LIMIT_WINDOW_MS = parseInt(process.env.RATE_LIMIT_WINDOW_MS || '60000', 10); // 1 min
const RATE_LIMIT_MAX = parseInt(process.env.RATE_LIMIT_MAX || '10', 10); // 每窗口最大请求数
const INTERNAL_TIMEOUT_MS = parseInt(process.env.INTERNAL_TIMEOUT_MS || '5000', 10); // 单步内部操作超时
const RETRY_MAX_ATTEMPTS = parseInt(process.env.RETRY_MAX_ATTEMPTS || '2', 10);
const RETRY_BASE_MS = parseInt(process.env.RETRY_BASE_MS || '200', 10);
const SMTP_HOST = process.env.SMTP_HOST; // e.g. "smtp.example.com:587"
const SMTP_KEY = process.env.SMTP_KEY; // 密钥/密码(API Key)
const SMTP_USER = process.env.SMTP_USER || 'apikey'; // 某些服务商使用固定用户名(SendGrid等)
const SMTP_FROM = process.env.SMTP_FROM || 'no-reply@example.com';
// ----------------------- CORS -----------------------
const CORS_HEADERS = {
'Access-Control-Allow-Origin': CORS_ORIGIN,
'Access-Control-Allow-Headers': 'Content-Type,X-Correlation-Id',
'Access-Control-Allow-Methods': 'POST,OPTIONS',
};
// ----------------------- 结构化日志(中间件式) -----------------------
let isColdStart = true;
function createLogger({ requestId, correlationId, ip, route }) {
function log(level, msg, extra = {}) {
const entry = {
ts: new Date().toISOString(),
level,
msg,
requestId,
correlationId,
route,
ip,
coldStart: isColdStart,
...sanitize(extra),
};
console.log(JSON.stringify(entry));
}
return {
info: (msg, extra) => log('INFO', msg, extra),
warn: (msg, extra) => log('WARN', msg, extra),
error: (msg, extra) => log('ERROR', msg, extra),
child: (fields) =>
createLogger({
requestId,
correlationId,
ip,
route,
...fields,
}),
};
}
function sanitize(obj) {
// 避免敏感信息落盘
const cloned = { ...obj };
if (cloned.password) cloned.password = '[REDACTED]';
if (cloned.passwordHash) cloned.passwordHash = '[REDACTED]';
if (cloned.smtpKey) cloned.smtpKey = '[REDACTED]';
return cloned;
}
// ----------------------- 错误类型与映射 -----------------------
class BadRequestError extends Error {
constructor(message, details) {
super(message);
this.name = 'BadRequestError';
this.statusCode = 400;
this.details = details;
}
}
class ConflictError extends Error {
constructor(message) {
super(message);
this.name = 'ConflictError';
this.statusCode = 409;
}
}
class TooManyRequestsError extends Error {
constructor(message, retryAfterSec) {
super(message);
this.name = 'TooManyRequestsError';
this.statusCode = 429;
this.retryAfterSec = retryAfterSec;
}
}
class InternalServerError extends Error {
constructor(message) {
super(message);
this.name = 'InternalServerError';
this.statusCode = 500;
}
}
function httpResponse(statusCode, body, extraHeaders = {}) {
return {
statusCode,
headers: {
'Content-Type': 'application/json; charset=utf-8',
...CORS_HEADERS,
...extraHeaders,
},
body: typeof body === 'string' ? body : JSON.stringify(body),
};
}
// ----------------------- 速率限制(容器级示例,仅演示用途) -----------------------
const rateBucket = new Map(); // key => { count, resetAt }
function rateLimitCheck(key, now = Date.now()) {
const slot = rateBucket.get(key);
if (!slot || now >= slot.resetAt) {
rateBucket.set(key, { count: 1, resetAt: now + RATE_LIMIT_WINDOW_MS });
return { allowed: true, remaining: RATE_LIMIT_MAX - 1, resetAt: slot?.resetAt || now + RATE_LIMIT_WINDOW_MS };
} else {
if (slot.count >= RATE_LIMIT_MAX) {
return { allowed: false, remaining: 0, resetAt: slot.resetAt };
}
slot.count += 1;
return { allowed: true, remaining: RATE_LIMIT_MAX - slot.count, resetAt: slot.resetAt };
}
}
// ----------------------- 重试 + 超时包装 -----------------------
async function sleep(ms) {
return new Promise((res) => setTimeout(res, ms));
}
function withTimeout(promise, ms, label = 'operation') {
let timer;
const timeoutPromise = new Promise((_, rej) => {
timer = setTimeout(() => rej(new InternalServerError(`${label} timed out after ${ms}ms`)), ms);
});
return Promise.race([promise.finally(() => clearTimeout(timer)), timeoutPromise]);
}
async function retryAsync(fn, { retries = RETRY_MAX_ATTEMPTS, base = RETRY_BASE_MS, label = 'operation' } = {}) {
let attempt = 0;
let lastErr;
while (attempt <= retries) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (attempt === retries) break;
const backoff = Math.min(base * Math.pow(2, attempt), 2000) + Math.floor(Math.random() * 100);
await sleep(backoff);
attempt += 1;
}
}
throw new InternalServerError(`${label} failed after ${retries + 1} attempt(s): ${lastErr?.message || 'unknown error'}`);
}
// ----------------------- 请求体验证 -----------------------
function parseJsonBody(eventBody) {
if (typeof eventBody === 'string') {
try {
return JSON.parse(eventBody);
} catch {
throw new BadRequestError('Invalid JSON body');
}
}
if (typeof eventBody === 'object' && eventBody != null) return eventBody;
throw new BadRequestError('Missing request body');
}
function validateRegisterInput(body) {
const errors = [];
const email = (body?.email || '').trim();
const password = body?.password || '';
// 简易邮箱校验(可替换更严格的正则或库)
const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
if (!email || !emailRegex.test(email)) {
errors.push({ field: 'email', message: 'Invalid email format' });
}
// 密码强度:长度>=8,至少包含三类:小写/大写/数字/特殊字符
const cats = [
/[a-z]/.test(password),
/[A-Z]/.test(password),
/[0-9]/.test(password),
/[^A-Za-z0-9]/.test(password),
].filter(Boolean).length;
if (!password || password.length < 8 || cats < 3) {
errors.push({
field: 'password',
message: 'Password too weak (min length 8, include at least 3 of [lower, upper, digit, symbol])',
});
}
if (errors.length) throw new BadRequestError('Validation failed', errors);
return { email, password };
}
// ----------------------- 用户存储(占位实现) -----------------------
// 注意:以下为内存占位。生产请替换为持久化存储(如 DynamoDB / RDS),并确保幂等与唯一约束。
const _userStore = new Map(); // email -> user
const _auditLogs = [];
async function isEmailTaken(email) {
await sleep(10);
return _userStore.has(email.toLowerCase());
}
async function createUser({ email, password }) {
// 使用 scrypt + salt 生成密码哈希(示例)
const salt = crypto.randomBytes(16).toString('hex');
const hash = await new Promise((res, rej) =>
crypto.scrypt(password, salt, 64, (err, derivedKey) => (err ? rej(err) : res(derivedKey.toString('hex'))))
);
const userId = crypto.randomUUID();
const user = {
userId,
email: email.toLowerCase(),
passwordHash: `scrypt$${salt}$${hash}`,
createdAt: new Date().toISOString(),
};
_userStore.set(user.email, user);
return user;
}
async function writeAuditLog(entry) {
await sleep(5);
_auditLogs.push({ ...entry, at: new Date().toISOString() });
}
// ----------------------- 邮件发送(SMTP 占位) -----------------------
function parseSmtpHostPort(hostStr) {
if (!hostStr) return null;
const [host, portStr] = hostStr.split(':');
const port = portStr ? parseInt(portStr, 10) : 587;
const secure = port === 465;
return { host, port, secure };
}
async function sendOneTimeCodeEmail({ to, code, logger }) {
if (!SMTP_HOST || !SMTP_KEY) {
logger.warn('SMTP not configured, skipping email send');
return { skipped: true };
}
const conf = parseSmtpHostPort(SMTP_HOST);
if (!conf?.host) throw new InternalServerError('Invalid SMTP_HOST');
// 延迟加载依赖:若未安装 nodemailer,将回退为日志占位
let nodemailer;
try {
nodemailer = require('nodemailer');
} catch {
logger.warn('nodemailer not installed, logging OTP instead of sending email');
logger.info('OTP generated (do not log in production)', { otp: code });
return { mocked: true };
}
const transporter = nodemailer.createTransport({
host: conf.host,
port: conf.port,
secure: conf.secure,
auth: { user: SMTP_USER, pass: SMTP_KEY },
});
const mail = {
from: SMTP_FROM,
to,
subject: 'Your verification code',
text: `Your one-time code is: ${code}. It expires in 10 minutes.`,
html: `<p>Your one-time code is: <b>${code}</b>. It expires in 10 minutes.</p>`,
};
// 重试 + 超时保护
await withTimeout(
retryAsync(() => transporter.sendMail(mail), {
retries: RETRY_MAX_ATTEMPTS,
base: RETRY_BASE_MS,
label: 'send email',
}),
INTERNAL_TIMEOUT_MS,
'send email'
);
return { sent: true };
}
// ----------------------- 处理函数 -----------------------
exports.handler = async (event, context) => {
const start = Date.now();
const requestId = context.awsRequestId;
const route = `${event.requestContext?.http?.method || ''} ${event.requestContext?.http?.path || ''}`;
const ip = event.requestContext?.http?.sourceIp || 'unknown';
const correlationId = event.headers?.['x-correlation-id'] || event.headers?.['X-Correlation-Id'] || crypto.randomUUID();
const logger = createLogger({ requestId, correlationId, ip, route });
try {
// CORS 预检
if ((event.requestContext?.http?.method || '').toUpperCase() === 'OPTIONS') {
isColdStart = false;
return {
statusCode: 204,
headers: { ...CORS_HEADERS },
body: '',
};
}
// 速率限制(示例)
const rl = rateLimitCheck(ip);
if (!rl.allowed) {
logger.warn('Rate limited', { remaining: rl.remaining, resetAt: rl.resetAt });
isColdStart = false;
return httpResponse(
429,
{ message: 'Too Many Requests' },
{ 'Retry-After': Math.ceil((rl.resetAt - Date.now()) / 1000) }
);
}
// 方法校验
const method = (event.requestContext?.http?.method || '').toUpperCase();
if (method !== 'POST') {
isColdStart = false;
return httpResponse(405, { message: 'Method Not Allowed' }, { Allow: 'POST,OPTIONS' });
}
// Content-Type 简校验
const ct = event.headers?.['content-type'] || event.headers?.['Content-Type'] || '';
if (!ct.toLowerCase().includes('application/json')) {
throw new BadRequestError('Content-Type must be application/json');
}
logger.info('Incoming request');
// 解析 + 校验
const body = parseJsonBody(event.body);
const input = validateRegisterInput(body);
// 业务:查重
if (await withTimeout(isEmailTaken(input.email), INTERNAL_TIMEOUT_MS, 'check duplicate')) {
throw new ConflictError('Email already registered');
}
// 业务:创建用户
const user = await withTimeout(
retryAsync(() => createUser(input), { label: 'create user' }),
INTERNAL_TIMEOUT_MS,
'create user'
);
// 审计日志
await withTimeout(
retryAsync(() => writeAuditLog({ type: 'USER_CREATED', userId: user.userId, email: user.email }), {
label: 'write audit log',
}),
INTERNAL_TIMEOUT_MS,
'write audit log'
);
// 发送一次性验证码(占位)
const otp = String(Math.floor(100000 + Math.random() * 900000));
try {
await sendOneTimeCodeEmail({ to: user.email, code: otp, logger });
// TODO: 将 OTP 持久化并设置过期时间,以便后续验证
} catch (e) {
// 不阻塞注册:邮件失败仅记录警告(可按需改为失败)
logger.warn('Failed to send OTP email', { err: e.message });
}
const duration = Date.now() - start;
logger.info('Register success', { userId: user.userId, durationMs: duration });
isColdStart = false;
return httpResponse(201, { userId: user.userId, message: 'User created' });
} catch (err) {
const duration = Date.now() - start;
const statusCode = err.statusCode || 500;
const payload =
statusCode === 400 && err.details
? { message: err.message, details: err.details }
: { message: err.message || 'Internal Server Error' };
const extraHeaders =
err instanceof TooManyRequestsError && err.retryAfterSec
? { 'Retry-After': String(err.retryAfterSec) }
: {};
if (statusCode >= 500) {
// 避免输出内部细节
logger.error('Unhandled error', { err: { name: err.name, message: err.message }, durationMs: duration });
isColdStart = false;
return httpResponse(500, { message: 'Internal Server Error' });
} else {
logger.warn('Handled error', { err: { name: err.name, message: err.message }, durationMs: duration });
isColdStart = false;
return httpResponse(statusCode, payload, extraHeaders);
}
}
};
// file: package.json
{
"name": "serverless-register-api",
"version": "1.0.0",
"description": "HTTP register function (Node.js 18.x on AWS Lambda)",
"main": "src/handler.js",
"type": "commonjs",
"license": "MIT",
"scripts": {
"lint": "eslint .",
"build": "echo 'no build step'",
"test": "node -e \"console.log('ok')\""
},
"dependencies": {
"nodemailer": "^6.9.9"
},
"devDependencies": {}
}
# file: serverless.yml (Serverless Framework v3)
service: register-api
frameworkVersion: '3'
provider:
name: aws
runtime: nodejs18.x
region: ${env:AWS_REGION, 'us-east-1'}
memorySize: 128
timeout: 6 # 函数超时(秒)——HTTP 同步调用无平台级重试
environment:
CORS_ORIGIN: ${env:CORS_ORIGIN, '*'}
RATE_LIMIT_WINDOW_MS: ${env:RATE_LIMIT_WINDOW_MS, '60000'}
RATE_LIMIT_MAX: ${env:RATE_LIMIT_MAX, '10'}
INTERNAL_TIMEOUT_MS: ${env:INTERNAL_TIMEOUT_MS, '5000'}
RETRY_MAX_ATTEMPTS: ${env:RETRY_MAX_ATTEMPTS, '2'}
RETRY_BASE_MS: ${env:RETRY_BASE_MS, '200'}
SMTP_HOST: ${env:SMTP_HOST, ''}
SMTP_KEY: ${env:SMTP_KEY, ''}
SMTP_USER: ${env:SMTP_USER, 'apikey'}
SMTP_FROM: ${env:SMTP_FROM, 'no-reply@example.com'}
functions:
register:
handler: src/handler.handler
events:
- httpApi:
method: POST
path: /api/register
# API Gateway CORS(建议在网关层开启,函数中也返回CORS头以防万一)
# 可选:自定义 IAM 权限(如访问DynamoDB/Secrets Manager时需要)
# iamRoleStatements:
# - Effect: Allow
# Action:
# - dynamodb:PutItem
# - dynamodb:GetItem
# Resource: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/YourTable
package:
patterns:
- '!**/*'
- 'src/**'
- 'package.json'
- 'package-lock.json'
- 'node_modules/**'
plugins: []
以上模板可直接用于 AWS Lambda + API Gateway HTTP API 的注册端点,并提供校验、日志、错误、CORS、速率限制、超时与重试以及邮件发送的完整演示骨架。根据业务需要替换占位存储和邮件实现后即可投入使用。
# app.py
# -*- coding: utf-8 -*-
import json
import os
import time
import uuid
import logging
import traceback
from datetime import datetime, timezone
from typing import Any, Dict, List
import boto3
from botocore.exceptions import ClientError
try:
import redis # pip install redis
except Exception:
redis = None # 允许本地/冷启动缺依赖,部署时请确保打包依赖
# -------------------------
# 环境变量
# -------------------------
ARCHIVE_BUCKET = os.getenv("ARCHIVE_BUCKET", "")
REDIS_URL = os.getenv("REDIS_URL", "")
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK", "") # 可选
# 幂等已处理标记 TTL(秒)
PROCESSED_TTL_SECONDS = int(os.getenv("PROCESSED_TTL_SECONDS", "604800")) # 默认7天
# 处理锁 TTL(秒)
LOCK_TTL_SECONDS = int(os.getenv("LOCK_TTL_SECONDS", "300")) # 5分钟
# CloudWatch 指标命名空间
METRIC_NAMESPACE = os.getenv("METRIC_NAMESPACE", "OrderProcessor")
SERVICE_NAME = os.getenv("SERVICE_NAME", "order-created-consumer")
# -------------------------
# 全局客户端(复用连接)
# -------------------------
s3 = boto3.client("s3")
_lambda = boto3.client("lambda") # 仅用于获取函数名等信息(可选)
_redis = None
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# -------------------------
# 异常分类
# -------------------------
class TransientError(Exception):
"""可重试异常(网络波动、服务限流/暂时不可用等)"""
pass
class PermanentError(Exception):
"""不可重试异常(数据校验失败、业务规则不满足等)"""
pass
# -------------------------
# 工具函数
# -------------------------
def now_utc_iso() -> str:
return datetime.now(timezone.utc).isoformat()
def parse_iso_date(s: str) -> datetime:
# 尽量兼容 "2024-01-01T00:00:00Z" / "+00:00"
if s.endswith("Z"):
s = s[:-1] + "+00:00"
return datetime.fromisoformat(s)
def get_redis() -> "redis.Redis":
global _redis
if _redis is None:
if not REDIS_URL:
raise PermanentError("Missing REDIS_URL env")
if redis is None:
raise PermanentError("redis library not available; bundle dependency")
_redis = redis.Redis.from_url(
REDIS_URL,
socket_timeout=2,
socket_connect_timeout=2,
health_check_interval=30,
decode_responses=True,
)
return _redis
def is_retryable_aws_error(err: ClientError) -> bool:
code = err.response.get("Error", {}).get("Code", "")
# 常见可重试错误码
retryable = {
"RequestTimeout", "Throttling", "ThrottlingException", "SlowDown",
"InternalError", "InternalServerError", "ServiceUnavailable",
"ProvisionedThroughputExceededException"
}
if code in retryable:
return True
# 5xx 一般可重试
try:
return code.isdigit() and int(code) >= 500
except Exception:
return False
def safe_json_dumps(obj: Any) -> str:
return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))
def emit_emf_metrics(dimensions: Dict[str, str], metrics: Dict[str, float]):
# CloudWatch Embedded Metric Format(EMF)
# 参考:https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html
metric_names = list(metrics.keys())
emf = {
"_aws": {
"Timestamp": int(time.time() * 1000),
"CloudWatchMetrics": [
{
"Namespace": METRIC_NAMESPACE,
"Dimensions": [list(dimensions.keys())],
"Metrics": [{"Name": n, "Unit": "Count" if n.endswith("_count") else "Milliseconds"} for n in metric_names],
}
],
}
}
emf.update(dimensions)
emf.update(metrics)
print(safe_json_dumps(emf))
def send_alert(title: str, message: str, context: Dict[str, Any] = None):
if not ALERT_WEBHOOK:
return
try:
import urllib.request
import urllib.error
payload = {
"title": title,
"message": message,
"service": SERVICE_NAME,
"time": now_utc_iso(),
"context": context or {},
}
data = safe_json_dumps(payload).encode("utf-8")
req = urllib.request.Request(
ALERT_WEBHOOK,
data=data,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(req, timeout=2) as resp:
resp.read()
except Exception as e:
logger.warning("alert_failed: %s", str(e))
def get_order_keys(order_id: str):
return f"order:{order_id}:done", f"order:{order_id}:lock"
def acquire_lock(r: "redis.Redis", lock_key: str, token: str, ttl_sec: int) -> bool:
# SET lock_key token NX EX ttl
return bool(r.set(lock_key, token, nx=True, ex=ttl_sec))
def release_lock_if_owner(r: "redis.Redis", lock_key: str, token: str):
# 原子释放锁(仅持有者可释放)
lua = """
if redis.call("GET", KEYS[1]) == ARGV[1] then
return redis.call("DEL", KEYS[1])
else
return 0
end
"""
try:
r.eval(lua, 1, lock_key, token)
except Exception:
pass
def parse_record_body(record: Dict[str, Any]) -> Dict[str, Any]:
"""
支持:
- 直接 SQS JSON: body = {"orderId": "...", ...}
- SNS->SQS: body = {"Type":"Notification", "Message":"{...json...}"}
"""
raw_body = record.get("body", "")
try:
body = json.loads(raw_body)
except Exception:
raise PermanentError("Message body is not valid JSON")
# SNS wrap
if isinstance(body, dict) and "Message" in body and "Type" in body:
try:
msg = json.loads(body["Message"])
except Exception:
raise PermanentError("SNS Message is not valid JSON")
return msg
elif isinstance(body, dict):
return body
else:
raise PermanentError("Unsupported message body structure")
def validate_message(msg: Dict[str, Any]):
required = ["orderId", "userId", "amount", "createdAt"]
missing = [k for k in required if k not in msg]
if missing:
raise PermanentError(f"Missing fields: {missing}")
# 基础类型与值校验
if not isinstance(msg["orderId"], str) or not msg["orderId"]:
raise PermanentError("orderId invalid")
if not isinstance(msg["userId"], str) or not msg["userId"]:
raise PermanentError("userId invalid")
try:
float(msg["amount"])
except Exception:
raise PermanentError("amount invalid")
try:
parse_iso_date(msg["createdAt"])
except Exception:
raise PermanentError("createdAt invalid ISO timestamp")
def archive_order_to_s3(msg: Dict[str, Any], s3_client, bucket: str, source_id: str):
created_at = parse_iso_date(msg["createdAt"])
y = created_at.strftime("%Y")
m = created_at.strftime("%m")
d = created_at.strftime("%d")
order_id = msg["orderId"]
# 归档 JSON
archive_doc = {
"orderId": msg["orderId"],
"userId": msg["userId"],
"amount": float(msg["amount"]),
"createdAt": msg["createdAt"],
"archivedAt": now_utc_iso(),
"sourceMessageId": source_id,
"status": "archived",
"topic": "order.created",
"version": 1,
}
key = f"order-created/y={y}/m={m}/d={d}/{order_id}.json"
try:
s3_client.put_object(
Bucket=bucket,
Key=key,
Body=safe_json_dumps(archive_doc).encode("utf-8"),
ContentType="application/json",
CacheControl="no-store",
)
except ClientError as e:
if is_retryable_aws_error(e):
raise TransientError(f"S3 retryable error: {e}")
raise PermanentError(f"S3 permanent error: {e}")
return key
def process_one_record(record: Dict[str, Any], context) -> str:
"""
返回状态:
- "ok":成功归档或已处理
- "skip":不可重试(已告警/隔离),当做成功确认消费
- 抛出 TransientError:局部失败,触发重试
"""
if not ARCHIVE_BUCKET:
raise TransientError("ARCHIVE_BUCKET not configured")
# 解析与校验
msg = parse_record_body(record)
validate_message(msg)
order_id = msg["orderId"]
message_id = record.get("messageId", str(uuid.uuid4()))
approx_receive = int(record.get("attributes", {}).get("ApproximateReceiveCount", "1"))
r = get_redis()
processed_key, lock_key = get_order_keys(order_id)
# 幂等:若已处理,直接返回成功
if r.exists(processed_key):
logger.info(safe_json_dumps({
"event": "idempotent_hit",
"orderId": order_id,
"messageId": message_id
}))
return "ok"
# 争抢处理锁,避免并发重复处理
token = context.aws_request_id if getattr(context, "aws_request_id", None) else str(uuid.uuid4())
if not acquire_lock(r, lock_key, token, LOCK_TTL_SECONDS):
# 另一并发实例正在处理,返回可重试让其完成后再确认
raise TransientError(f"processing_in_progress orderId={order_id}")
try:
# 归档到 S3
key = archive_order_to_s3(msg, s3, ARCHIVE_BUCKET, message_id)
# 标记已处理(幂等)
r.set(processed_key, now_utc_iso(), ex=PROCESSED_TTL_SECONDS)
logger.info(safe_json_dumps({
"event": "archived",
"orderId": order_id,
"s3key": key,
"messageId": message_id
}))
return "ok"
except PermanentError as pe:
# 不可重试:记录到错误前缀并告警,避免无谓重试
try:
quarantine_key = f"errors/order-created/{order_id}-{message_id}.json"
s3.put_object(
Bucket=ARCHIVE_BUCKET,
Key=quarantine_key,
Body=safe_json_dumps({
"error": str(pe),
"message": msg,
"archivedAt": now_utc_iso(),
"receiveCount": approx_receive,
"messageId": message_id
}).encode("utf-8"),
ContentType="application/json",
CacheControl="no-store",
)
except Exception:
# 隔离失败不影响主流程
pass
send_alert(
title="[PermanentError] order.created",
message=str(pe),
context={"orderId": order_id, "messageId": message_id, "receiveCount": approx_receive},
)
logger.error(safe_json_dumps({
"event": "permanent_error",
"orderId": order_id,
"messageId": message_id,
"error": str(pe)
}))
# 不重试
return "skip"
finally:
release_lock_if_owner(r, lock_key, token)
def handler(event, context):
"""
SQS 触发的批处理 Lambda。
- 支持局部失败(Partial Batch Response),仅失败项重试
- 记录批处理耗时与计数指标(EMF)
"""
batch_start = time.perf_counter()
batch_received = len(event.get("Records", []))
success_count = 0
skip_count = 0
transient_fail_count = 0
permanent_fail_count = 0 # 被判定为永久失败但已跳过重试
item_failures: List[Dict[str, str]] = []
for record in event.get("Records", []):
per_start = time.perf_counter()
mid = record.get("messageId", "unknown")
try:
status = process_one_record(record, context)
if status == "ok":
success_count += 1
elif status == "skip":
skip_count += 1
except TransientError as te:
transient_fail_count += 1
item_failures.append({"itemIdentifier": mid})
logger.warning(safe_json_dumps({
"event": "transient_error",
"messageId": mid,
"error": str(te)
}))
# 当即将进入 DLQ(如已达阈值)时发送告警
try:
approx_receive = int(record.get("attributes", {}).get("ApproximateReceiveCount", "1"))
except Exception:
approx_receive = 1
if approx_receive >= 3:
send_alert(
title="[RetryExhausted] order.created",
message=str(te),
context={"messageId": mid, "receiveCount": approx_receive},
)
except PermanentError as pe:
permanent_fail_count += 1
# 跳过重试,已在内部记录与隔离
except Exception as e:
# 未分类异常按可重试处理
transient_fail_count += 1
item_failures.append({"itemIdentifier": mid})
logger.error(safe_json_dumps({
"event": "unexpected_error",
"messageId": mid,
"error": str(e),
"trace": traceback.format_exc()
}))
finally:
per_dur_ms = (time.perf_counter() - per_start) * 1000.0
print(safe_json_dumps({
"event": "item_processed",
"messageId": mid,
"duration_ms": round(per_dur_ms, 2)
}))
batch_dur_ms = (time.perf_counter() - batch_start) * 1000.0
# 输出 EMF 指标
dims = {
"service": SERVICE_NAME,
"function": os.getenv("AWS_LAMBDA_FUNCTION_NAME", "unknown"),
"stage": os.getenv("STAGE", "prod"),
}
emit_emf_metrics(
dimensions=dims,
metrics={
"received_count": float(batch_received),
"success_count": float(success_count),
"skip_count": float(skip_count),
"transient_fail_count": float(transient_fail_count),
"permanent_fail_count": float(permanent_fail_count),
"batch_duration_ms": round(batch_dur_ms, 2),
},
)
# Partial Batch Response:仅重试失败项
return {"batchItemFailures": item_failures}
requirements.txt
boto3==1.35.0
redis==4.6.0
使用 AWS SAM 示例(可根据需要改为 CDK 或 Serverless Framework):
# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Order Created Consumer - SQS -> Lambda -> S3 with Redis Idempotency
Parameters:
ArchiveBucketName:
Type: String
Description: S3 bucket name for archiving order summaries
RedisUrl:
Type: String
Description: Redis connection URL (e.g., rediss://user:pass@host:6379/0)
AlertWebhook:
Type: String
Default: ""
Description: Optional alert webhook URL
BatchSize:
Type: Number
Default: 10
MinValue: 1
MaxValue: 10
MaxConcurrency:
Type: Number
Default: 10
MinValue: 2
MaxValue: 100
ProcessedTtlSeconds:
Type: Number
Default: 604800
LockTtlSeconds:
Type: Number
Default: 300
Resources:
OrderCreatedDLQ:
Type: AWS::SQS::Queue
Properties:
QueueName: order-created-dlq
OrderCreatedQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: order-created
RedrivePolicy:
deadLetterTargetArn: !GetAtt OrderCreatedDLQ.Arn
maxReceiveCount: 3 # 失败重试最多3次,超出进入DLQ
OrderCreatedConsumer:
Type: AWS::Serverless::Function
Properties:
FunctionName: order-created-consumer
Runtime: python3.9
Handler: app.handler
Timeout: 30
MemorySize: 512
CodeUri: ./
Environment:
Variables:
ARCHIVE_BUCKET: !Ref ArchiveBucketName
REDIS_URL: !Ref RedisUrl
ALERT_WEBHOOK: !Ref AlertWebhook
METRIC_NAMESPACE: OrderPipeline
SERVICE_NAME: order-created-consumer
PROCESSED_TTL_SECONDS: !Ref ProcessedTtlSeconds
LOCK_TTL_SECONDS: !Ref LockTtlSeconds
Policies:
- S3WritePolicy:
BucketName: !Ref ArchiveBucketName
- CloudWatchLogsFullAccess
Events:
SQSTrigger:
Type: SQS
Properties:
Queue: !GetAtt OrderCreatedQueue.Arn
BatchSize: !Ref BatchSize
FunctionResponseTypes:
- ReportBatchItemFailures
ScalingConfig:
MaximumConcurrency: !Ref MaxConcurrency
# 如需访问 VPC 内的 Redis,请在此配置 VpcConfig(子网、安组)
# VpcConfig:
# SecurityGroupIds: [sg-xxxxxxxx]
# SubnetIds: [subnet-aaaa, subnet-bbbb]
Outputs:
QueueUrl:
Value: !Ref OrderCreatedQueue
DLQUrl:
Value: !Ref OrderCreatedDLQ
说明:
如需改为 Azure Functions(Service Bus)或 GCP(Pub/Sub + Cloud Storage),我可在保持相同业务语义下输出对应平台模板。
// 文件:main.go
// 功能:当 S3 bucket 的 images/ 前缀有 JPG/PNG 新文件上传时,生成 200x200 居中裁剪的 JPEG 缩略图(质量80),写回到 OUTPUT_PREFIX(默认 thumbnails/)同名路径;
// - 若目标已存在则跳过
// - 对非图片或超过 MAX_SIZE_MB 的文件记录告警并结束
// - 输出结构化日志和耗时指标
// 环境变量:
// OUTPUT_PREFIX (默认 "thumbnails/")
// MAX_SIZE_MB (默认 10)
// CONCURRENCY (默认 5)
package main
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"image"
"image/jpeg"
_ "image/png" // 注册 PNG 解码
"io"
"net/url"
"os"
"path"
"strings"
"sync"
"time"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-lambda-go/lambdacontext"
"github.com/aws/aws-lambda-go/lambda"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
s3 "github.com/aws/aws-sdk-go-v2/service/s3"
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
smithy "github.com/aws/smithy-go"
smithyhttp "github.com/aws/smithy-go/transport/http"
"github.com/disintegration/imaging"
"github.com/rwcarlsen/goexif/exif"
)
const (
sourcePrefix = "images/"
thumbSize = 200
jpgQuality = 80
)
type AppConfig struct {
OutputPrefix string
MaxSizeBytes int64
Concurrency int
}
var (
s3cl *s3.Client
appcf AppConfig
)
func init() {
// 初始化 SDK 客户端(复用连接)
ctx := context.Background()
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
panic(fmt.Errorf("load aws config: %w", err))
}
s3cl = s3.NewFromConfig(cfg)
appcf = loadConfig()
}
func main() {
lambda.Start(handler)
}
func handler(ctx context.Context, evt events.S3Event) error {
start := time.Now()
reqID := requestIDFromCtx(ctx)
logJSON("info", "invocation_start", map[string]any{
"request_id": reqID,
"records": len(evt.Records),
"config": map[string]any{
"output_prefix": appcf.OutputPrefix,
"max_size_bytes": appcf.MaxSizeBytes,
"concurrency": appcf.Concurrency,
},
})
sem := make(chan struct{}, appcf.Concurrency)
var wg sync.WaitGroup
errs := make(chan error, len(evt.Records))
for _, r := range evt.Records {
rec := r // 避免闭包变量捕获问题
wg.Add(1)
sem <- struct{}{}
go func() {
defer func() {
<-sem
wg.Done()
}()
if err := processRecord(ctx, rec); err != nil {
errs <- err
}
}()
}
wg.Wait()
close(errs)
var firstErr error
for e := range errs {
if firstErr == nil {
firstErr = e
}
// 继续收集但仅返回第一个错误(可按需修改为汇总)
}
logJSON("info", "invocation_end", map[string]any{
"request_id": reqID,
"duration_ms": time.Since(start).Milliseconds(),
"error": errToStr(firstErr),
})
return firstErr
}
func processRecord(ctx context.Context, rec events.S3EventRecord) error {
start := time.Now()
bucket := rec.S3.Bucket.Name
keyEnc := rec.S3.Object.Key
key, _ := url.QueryUnescape(keyEnc)
key = strings.ReplaceAll(key, "+", " ") // 适配某些编码形式
ext := strings.ToLower(path.Ext(key))
isAllowed := ext == ".jpg" || ext == ".jpeg" || ext == ".png"
// 仅处理 images/ 前缀
if !strings.HasPrefix(key, sourcePrefix) {
logJSON("debug", "skip_non_matching_prefix", map[string]any{
"bucket": bucket, "key": key,
})
return nil
}
if !isAllowed {
logJSON("warn", "skip_non_image_extension", map[string]any{
"bucket": bucket, "key": key, "ext": ext,
})
return nil
}
// 目标 Key 计算
suffix := strings.TrimPrefix(key, sourcePrefix)
dstKey := path.Clean(appcf.OutputPrefix + suffix)
// 若目标已存在则跳过
exists, err := objectExists(ctx, bucket, dstKey)
if err != nil {
logJSON("error", "head_destination_failed", map[string]any{
"bucket": bucket, "key": dstKey, "error": err.Error(),
})
return err
}
if exists {
logJSON("info", "skip_already_exists", map[string]any{
"bucket": bucket, "src_key": key, "dst_key": dstKey,
})
return nil
}
// 源对象大小检查(先 HEAD 再 GET,避免读大对象)
head, err := s3cl.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
if isNotFound(err) {
logJSON("warn", "source_not_found", map[string]any{
"bucket": bucket, "key": key,
})
return nil
}
return fmt.Errorf("head source: %w", err)
}
srcSize := int64(0)
if head.ContentLength != nil {
srcSize = *head.ContentLength
}
if appcf.MaxSizeBytes > 0 && srcSize > appcf.MaxSizeBytes {
logJSON("warn", "skip_exceed_max_size", map[string]any{
"bucket": bucket, "key": key, "size_bytes": srcSize, "max_size_bytes": appcf.MaxSizeBytes,
})
return nil
}
// 读取对象(再限流一次,防止 HEAD 与实际不一致)
get, err := s3cl.GetObject(ctx, &s3.GetObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err != nil {
if isNotFound(err) {
logJSON("warn", "source_not_found_get", map[string]any{
"bucket": bucket, "key": key,
})
return nil
}
return fmt.Errorf("get source: %w", err)
}
defer get.Body.Close()
maxBytes := appcf.MaxSizeBytes
if maxBytes <= 0 {
maxBytes = 10 << 20 // 安全兜底:10MB
}
var buf bytes.Buffer
lr := &io.LimitedReader{R: get.Body, N: maxBytes + 1}
n, err := io.Copy(&buf, lr)
if err != nil {
return fmt.Errorf("read source: %w", err)
}
if n > maxBytes {
logJSON("warn", "skip_exceed_max_size_stream", map[string]any{
"bucket": bucket, "key": key, "read_bytes": n, "max_size_bytes": maxBytes,
})
return nil
}
// 解码并尊重 EXIF 方向
// 说明:对 JPEG 读取 EXIF Orientation 并将图像内容转换为正确方向(最终 JPEG 不保证完整保留所有 EXIF,避免磁盘/内存开销)
srcImg, format, err := image.Decode(bytes.NewReader(buf.Bytes()))
if err != nil {
logJSON("warn", "decode_failed_not_image", map[string]any{
"bucket": bucket, "key": key, "error": err.Error(),
})
return nil // 非图片:记录告警并结束
}
if format == "jpeg" {
if ex, err := exif.Decode(bytes.NewReader(buf.Bytes())); err == nil && ex != nil {
srcImg = applyEXIFOrientation(srcImg, ex)
}
}
// 生成 200x200 居中裁剪缩略图(Lanczos,质量80)
thumb := imaging.Fill(srcImg, thumbSize, thumbSize, imaging.Center, imaging.Lanczos)
var out bytes.Buffer
if err := jpeg.Encode(&out, thumb, &jpeg.Options{Quality: jpgQuality}); err != nil {
return fmt.Errorf("encode jpeg: %w", err)
}
// 写回 S3(ContentType: image/jpeg)
_, err = s3cl.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(dstKey),
Body: bytes.NewReader(out.Bytes()),
ContentType: aws.String("image/jpeg"),
// 如需加密可启用(示例:SSE-S3)
// ServerSideEncryption: s3types.ServerSideEncryptionAes256,
Metadata: map[string]string{
"x-source-key": key,
},
})
if err != nil {
return fmt.Errorf("put destination: %w", err)
}
logJSON("info", "thumb_generated", map[string]any{
"bucket": bucket,
"src_key": key,
"dst_key": dstKey,
"src_bytes": n,
"dst_bytes": out.Len(),
"duration_ms": time.Since(start).Milliseconds(),
"format": format,
"jpeg_quality": jpgQuality,
"thumb_size_px": fmt.Sprintf("%dx%d", thumbSize, thumbSize),
})
return nil
}
func objectExists(ctx context.Context, bucket, key string) (bool, error) {
_, err := s3cl.HeadObject(ctx, &s3.HeadObjectInput{
Bucket: aws.String(bucket),
Key: aws.String(key),
})
if err == nil {
return true, nil
}
if isNotFound(err) {
return false, nil
}
return false, err
}
func isNotFound(err error) bool {
// 兼容多种 404/NoSuchKey
var re *smithyhttp.ResponseError
if errors.As(err, &re) {
if re.HTTPStatusCode() == 404 {
return true
}
}
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
code := apiErr.ErrorCode()
if code == "NotFound" || code == "NoSuchKey" {
return true
}
}
var nfe *s3types.NotFound
if errors.As(err, &nfe) {
return true
}
return false
}
func loadConfig() AppConfig {
maxMB := parseInt64(getEnv("MAX_SIZE_MB", "10"))
con := parseInt(getEnv("CONCURRENCY", "5"))
prefix := getEnv("OUTPUT_PREFIX", "thumbnails/")
// 规范化前缀,确保不以 "/" 开头,以 "/" 结尾
prefix = strings.TrimPrefix(prefix, "/")
if !strings.HasSuffix(prefix, "/") {
prefix += "/"
}
if prefix == sourcePrefix {
// 防止回写触发环路
prefix = "thumbnails/"
}
return AppConfig{
OutputPrefix: prefix,
MaxSizeBytes: maxMB << 20,
Concurrency: con,
}
}
func getEnv(k, def string) string {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
return v
}
func parseInt(s string) int {
n := 0
fmt.Sscanf(s, "%d", &n)
if n <= 0 {
n = 1
}
return n
}
func parseInt64(s string) int64 {
var n int64
fmt.Sscanf(s, "%d", &n)
if n <= 0 {
n = 1
}
return n
}
func applyEXIFOrientation(img image.Image, ex *exif.Exif) image.Image {
tag, err := ex.Get(exif.Orientation)
if err != nil || tag == nil {
return img
}
orient, err := tag.Int(0)
if err != nil {
return img
}
switch orient {
case 1:
// 正常
return img
case 2:
// 水平镜像
return imaging.FlipH(img)
case 3:
// 180
return imaging.Rotate180(img)
case 4:
// 垂直镜像
return imaging.FlipV(img)
case 5:
// 水平镜像 + 270 CW
return imaging.Rotate270(imaging.FlipH(img))
case 6:
// 90 CW
return imaging.Rotate270(imaging.Rotate180(img)) // 等效为 Rotate90,但 imaging 没有直接 Rotate90? 有,使用 Rotate90 更直观:
// return imaging.Rotate90(img)
case 7:
// 水平镜像 + 90 CW
return imaging.Rotate90(imaging.FlipH(img))
case 8:
// 270 CW
return imaging.Rotate270(img)
default:
return img
}
}
func requestIDFromCtx(ctx context.Context) string {
if lc, ok := lambdacontext.FromContext(ctx); ok && lc != nil {
return lc.AwsRequestID
}
return ""
}
func errToStr(e error) string {
if e == nil {
return ""
}
return e.Error()
}
func logJSON(level, msg string, fields map[string]any) {
if fields == nil {
fields = map[string]any{}
}
fields["level"] = level
fields["msg"] = msg
fields["ts"] = time.Now().Format(time.RFC3339Nano)
b, _ := json.Marshal(fields)
fmt.Println(string(b))
}
备注:
如需移植到其他对象存储平台(如 GCS/Azure Blob/阿里云 OSS),仅需替换存储 SDK 及事件模型,核心图像处理与并发框架可原样复用。
以最少输入,在几分钟内产出可直接落地的无服务器函数模板,显著缩短从想法到可运行样例的周期。
用它快速推出后端接口与事件处理,建立统一模板库,带新人在一周内搭起可用后端;压缩试错与云账单,聚焦核心业务验证。
制定企业级无服务器规范与最佳实践,一键生成跨平台模板,统一日志与告警方案;用于治理多团队代码风格与成本。
在多项目中快速交付第三方回调、表单处理与轻量接口;减少重复体力活,把精力放在业务逻辑与用户体验。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
一次支付永久解锁,全站资源与持续更新;商业项目无限次使用