×
¥
查看详情

函数模板概述

  • 平台类型:AWS Lambda + API Gateway HTTP API
  • 运行时环境:nodejs18.x
  • 触发类型:HTTP(POST /api/register)

核心代码

// file: src/handler.js
// Node.js 18.x (CommonJS). HTTP API: POST /api/register
// 目标:请求体验证、结构化日志(中间件式)、统一错误映射(400/409/500)、CORS、速率限制示例、可配置超时与重试策略、SMTP 占位发送验证码

'use strict';

const crypto = require('node:crypto');

// ----------------------- 可调参数(通过环境变量覆盖) -----------------------
const CORS_ORIGIN = process.env.CORS_ORIGIN || '*';
const RATE_LIMIT_WINDOW_MS = parseInt(process.env.RATE_LIMIT_WINDOW_MS || '60000', 10); // 1 min
const RATE_LIMIT_MAX = parseInt(process.env.RATE_LIMIT_MAX || '10', 10); // 每窗口最大请求数
const INTERNAL_TIMEOUT_MS = parseInt(process.env.INTERNAL_TIMEOUT_MS || '5000', 10); // 单步内部操作超时
const RETRY_MAX_ATTEMPTS = parseInt(process.env.RETRY_MAX_ATTEMPTS || '2', 10);
const RETRY_BASE_MS = parseInt(process.env.RETRY_BASE_MS || '200', 10);
const SMTP_HOST = process.env.SMTP_HOST; // e.g. "smtp.example.com:587"
const SMTP_KEY = process.env.SMTP_KEY; // 密钥/密码(API Key)
const SMTP_USER = process.env.SMTP_USER || 'apikey'; // 某些服务商使用固定用户名(SendGrid等)
const SMTP_FROM = process.env.SMTP_FROM || 'no-reply@example.com';

// ----------------------- CORS -----------------------
const CORS_HEADERS = {
  'Access-Control-Allow-Origin': CORS_ORIGIN,
  'Access-Control-Allow-Headers': 'Content-Type,X-Correlation-Id',
  'Access-Control-Allow-Methods': 'POST,OPTIONS',
};

// ----------------------- 结构化日志(中间件式) -----------------------
let isColdStart = true;
function createLogger({ requestId, correlationId, ip, route }) {
  function log(level, msg, extra = {}) {
    const entry = {
      ts: new Date().toISOString(),
      level,
      msg,
      requestId,
      correlationId,
      route,
      ip,
      coldStart: isColdStart,
      ...sanitize(extra),
    };
    console.log(JSON.stringify(entry));
  }
  return {
    info: (msg, extra) => log('INFO', msg, extra),
    warn: (msg, extra) => log('WARN', msg, extra),
    error: (msg, extra) => log('ERROR', msg, extra),
    child: (fields) =>
      createLogger({
        requestId,
        correlationId,
        ip,
        route,
        ...fields,
      }),
  };
}
function sanitize(obj) {
  // 避免敏感信息落盘
  const cloned = { ...obj };
  if (cloned.password) cloned.password = '[REDACTED]';
  if (cloned.passwordHash) cloned.passwordHash = '[REDACTED]';
  if (cloned.smtpKey) cloned.smtpKey = '[REDACTED]';
  return cloned;
}

// ----------------------- 错误类型与映射 -----------------------
class BadRequestError extends Error {
  constructor(message, details) {
    super(message);
    this.name = 'BadRequestError';
    this.statusCode = 400;
    this.details = details;
  }
}
class ConflictError extends Error {
  constructor(message) {
    super(message);
    this.name = 'ConflictError';
    this.statusCode = 409;
  }
}
class TooManyRequestsError extends Error {
  constructor(message, retryAfterSec) {
    super(message);
    this.name = 'TooManyRequestsError';
    this.statusCode = 429;
    this.retryAfterSec = retryAfterSec;
  }
}
class InternalServerError extends Error {
  constructor(message) {
    super(message);
    this.name = 'InternalServerError';
    this.statusCode = 500;
  }
}

function httpResponse(statusCode, body, extraHeaders = {}) {
  return {
    statusCode,
    headers: {
      'Content-Type': 'application/json; charset=utf-8',
      ...CORS_HEADERS,
      ...extraHeaders,
    },
    body: typeof body === 'string' ? body : JSON.stringify(body),
  };
}

// ----------------------- 速率限制(容器级示例,仅演示用途) -----------------------
const rateBucket = new Map(); // key => { count, resetAt }
function rateLimitCheck(key, now = Date.now()) {
  const slot = rateBucket.get(key);
  if (!slot || now >= slot.resetAt) {
    rateBucket.set(key, { count: 1, resetAt: now + RATE_LIMIT_WINDOW_MS });
    return { allowed: true, remaining: RATE_LIMIT_MAX - 1, resetAt: slot?.resetAt || now + RATE_LIMIT_WINDOW_MS };
  } else {
    if (slot.count >= RATE_LIMIT_MAX) {
      return { allowed: false, remaining: 0, resetAt: slot.resetAt };
    }
    slot.count += 1;
    return { allowed: true, remaining: RATE_LIMIT_MAX - slot.count, resetAt: slot.resetAt };
  }
}

// ----------------------- 重试 + 超时包装 -----------------------
async function sleep(ms) {
  return new Promise((res) => setTimeout(res, ms));
}
function withTimeout(promise, ms, label = 'operation') {
  let timer;
  const timeoutPromise = new Promise((_, rej) => {
    timer = setTimeout(() => rej(new InternalServerError(`${label} timed out after ${ms}ms`)), ms);
  });
  return Promise.race([promise.finally(() => clearTimeout(timer)), timeoutPromise]);
}
async function retryAsync(fn, { retries = RETRY_MAX_ATTEMPTS, base = RETRY_BASE_MS, label = 'operation' } = {}) {
  let attempt = 0;
  let lastErr;
  while (attempt <= retries) {
    try {
      return await fn();
    } catch (err) {
      lastErr = err;
      if (attempt === retries) break;
      const backoff = Math.min(base * Math.pow(2, attempt), 2000) + Math.floor(Math.random() * 100);
      await sleep(backoff);
      attempt += 1;
    }
  }
  throw new InternalServerError(`${label} failed after ${retries + 1} attempt(s): ${lastErr?.message || 'unknown error'}`);
}

// ----------------------- 请求体验证 -----------------------
function parseJsonBody(eventBody) {
  if (typeof eventBody === 'string') {
    try {
      return JSON.parse(eventBody);
    } catch {
      throw new BadRequestError('Invalid JSON body');
    }
  }
  if (typeof eventBody === 'object' && eventBody != null) return eventBody;
  throw new BadRequestError('Missing request body');
}
function validateRegisterInput(body) {
  const errors = [];
  const email = (body?.email || '').trim();
  const password = body?.password || '';

  // 简易邮箱校验(可替换更严格的正则或库)
  const emailRegex = /^[^\s@]+@[^\s@]+\.[^\s@]+$/;
  if (!email || !emailRegex.test(email)) {
    errors.push({ field: 'email', message: 'Invalid email format' });
  }

  // 密码强度:长度>=8,至少包含三类:小写/大写/数字/特殊字符
  const cats = [
    /[a-z]/.test(password),
    /[A-Z]/.test(password),
    /[0-9]/.test(password),
    /[^A-Za-z0-9]/.test(password),
  ].filter(Boolean).length;
  if (!password || password.length < 8 || cats < 3) {
    errors.push({
      field: 'password',
      message: 'Password too weak (min length 8, include at least 3 of [lower, upper, digit, symbol])',
    });
  }

  if (errors.length) throw new BadRequestError('Validation failed', errors);
  return { email, password };
}

// ----------------------- 用户存储(占位实现) -----------------------
// 注意:以下为内存占位。生产请替换为持久化存储(如 DynamoDB / RDS),并确保幂等与唯一约束。
const _userStore = new Map(); // email -> user
const _auditLogs = [];
async function isEmailTaken(email) {
  await sleep(10);
  return _userStore.has(email.toLowerCase());
}
async function createUser({ email, password }) {
  // 使用 scrypt + salt 生成密码哈希(示例)
  const salt = crypto.randomBytes(16).toString('hex');
  const hash = await new Promise((res, rej) =>
    crypto.scrypt(password, salt, 64, (err, derivedKey) => (err ? rej(err) : res(derivedKey.toString('hex'))))
  );
  const userId = crypto.randomUUID();
  const user = {
    userId,
    email: email.toLowerCase(),
    passwordHash: `scrypt$${salt}$${hash}`,
    createdAt: new Date().toISOString(),
  };
  _userStore.set(user.email, user);
  return user;
}
async function writeAuditLog(entry) {
  await sleep(5);
  _auditLogs.push({ ...entry, at: new Date().toISOString() });
}

// ----------------------- 邮件发送(SMTP 占位) -----------------------
function parseSmtpHostPort(hostStr) {
  if (!hostStr) return null;
  const [host, portStr] = hostStr.split(':');
  const port = portStr ? parseInt(portStr, 10) : 587;
  const secure = port === 465;
  return { host, port, secure };
}
async function sendOneTimeCodeEmail({ to, code, logger }) {
  if (!SMTP_HOST || !SMTP_KEY) {
    logger.warn('SMTP not configured, skipping email send');
    return { skipped: true };
  }
  const conf = parseSmtpHostPort(SMTP_HOST);
  if (!conf?.host) throw new InternalServerError('Invalid SMTP_HOST');

  // 延迟加载依赖:若未安装 nodemailer,将回退为日志占位
  let nodemailer;
  try {
    nodemailer = require('nodemailer');
  } catch {
    logger.warn('nodemailer not installed, logging OTP instead of sending email');
    logger.info('OTP generated (do not log in production)', { otp: code });
    return { mocked: true };
  }

  const transporter = nodemailer.createTransport({
    host: conf.host,
    port: conf.port,
    secure: conf.secure,
    auth: { user: SMTP_USER, pass: SMTP_KEY },
  });

  const mail = {
    from: SMTP_FROM,
    to,
    subject: 'Your verification code',
    text: `Your one-time code is: ${code}. It expires in 10 minutes.`,
    html: `<p>Your one-time code is: <b>${code}</b>. It expires in 10 minutes.</p>`,
  };

  // 重试 + 超时保护
  await withTimeout(
    retryAsync(() => transporter.sendMail(mail), {
      retries: RETRY_MAX_ATTEMPTS,
      base: RETRY_BASE_MS,
      label: 'send email',
    }),
    INTERNAL_TIMEOUT_MS,
    'send email'
  );
  return { sent: true };
}

// ----------------------- 处理函数 -----------------------
exports.handler = async (event, context) => {
  const start = Date.now();
  const requestId = context.awsRequestId;
  const route = `${event.requestContext?.http?.method || ''} ${event.requestContext?.http?.path || ''}`;
  const ip = event.requestContext?.http?.sourceIp || 'unknown';
  const correlationId = event.headers?.['x-correlation-id'] || event.headers?.['X-Correlation-Id'] || crypto.randomUUID();

  const logger = createLogger({ requestId, correlationId, ip, route });

  try {
    // CORS 预检
    if ((event.requestContext?.http?.method || '').toUpperCase() === 'OPTIONS') {
      isColdStart = false;
      return {
        statusCode: 204,
        headers: { ...CORS_HEADERS },
        body: '',
      };
    }

    // 速率限制(示例)
    const rl = rateLimitCheck(ip);
    if (!rl.allowed) {
      logger.warn('Rate limited', { remaining: rl.remaining, resetAt: rl.resetAt });
      isColdStart = false;
      return httpResponse(
        429,
        { message: 'Too Many Requests' },
        { 'Retry-After': Math.ceil((rl.resetAt - Date.now()) / 1000) }
      );
    }

    // 方法校验
    const method = (event.requestContext?.http?.method || '').toUpperCase();
    if (method !== 'POST') {
      isColdStart = false;
      return httpResponse(405, { message: 'Method Not Allowed' }, { Allow: 'POST,OPTIONS' });
    }

    // Content-Type 简校验
    const ct = event.headers?.['content-type'] || event.headers?.['Content-Type'] || '';
    if (!ct.toLowerCase().includes('application/json')) {
      throw new BadRequestError('Content-Type must be application/json');
    }

    logger.info('Incoming request');

    // 解析 + 校验
    const body = parseJsonBody(event.body);
    const input = validateRegisterInput(body);

    // 业务:查重
    if (await withTimeout(isEmailTaken(input.email), INTERNAL_TIMEOUT_MS, 'check duplicate')) {
      throw new ConflictError('Email already registered');
    }

    // 业务:创建用户
    const user = await withTimeout(
      retryAsync(() => createUser(input), { label: 'create user' }),
      INTERNAL_TIMEOUT_MS,
      'create user'
    );

    // 审计日志
    await withTimeout(
      retryAsync(() => writeAuditLog({ type: 'USER_CREATED', userId: user.userId, email: user.email }), {
        label: 'write audit log',
      }),
      INTERNAL_TIMEOUT_MS,
      'write audit log'
    );

    // 发送一次性验证码(占位)
    const otp = String(Math.floor(100000 + Math.random() * 900000));
    try {
      await sendOneTimeCodeEmail({ to: user.email, code: otp, logger });
      // TODO: 将 OTP 持久化并设置过期时间,以便后续验证
    } catch (e) {
      // 不阻塞注册:邮件失败仅记录警告(可按需改为失败)
      logger.warn('Failed to send OTP email', { err: e.message });
    }

    const duration = Date.now() - start;
    logger.info('Register success', { userId: user.userId, durationMs: duration });

    isColdStart = false;
    return httpResponse(201, { userId: user.userId, message: 'User created' });
  } catch (err) {
    const duration = Date.now() - start;
    const statusCode = err.statusCode || 500;
    const payload =
      statusCode === 400 && err.details
        ? { message: err.message, details: err.details }
        : { message: err.message || 'Internal Server Error' };

    const extraHeaders =
      err instanceof TooManyRequestsError && err.retryAfterSec
        ? { 'Retry-After': String(err.retryAfterSec) }
        : {};

    if (statusCode >= 500) {
      // 避免输出内部细节
      logger.error('Unhandled error', { err: { name: err.name, message: err.message }, durationMs: duration });
      isColdStart = false;
      return httpResponse(500, { message: 'Internal Server Error' });
    } else {
      logger.warn('Handled error', { err: { name: err.name, message: err.message }, durationMs: duration });
      isColdStart = false;
      return httpResponse(statusCode, payload, extraHeaders);
    }
  }
};
// file: package.json
{
  "name": "serverless-register-api",
  "version": "1.0.0",
  "description": "HTTP register function (Node.js 18.x on AWS Lambda)",
  "main": "src/handler.js",
  "type": "commonjs",
  "license": "MIT",
  "scripts": {
    "lint": "eslint .",
    "build": "echo 'no build step'",
    "test": "node -e \"console.log('ok')\""
  },
  "dependencies": {
    "nodemailer": "^6.9.9"
  },
  "devDependencies": {}
}
# file: serverless.yml  (Serverless Framework v3)
service: register-api
frameworkVersion: '3'

provider:
  name: aws
  runtime: nodejs18.x
  region: ${env:AWS_REGION, 'us-east-1'}
  memorySize: 128
  timeout: 6 # 函数超时(秒)——HTTP 同步调用无平台级重试
  environment:
    CORS_ORIGIN: ${env:CORS_ORIGIN, '*'}
    RATE_LIMIT_WINDOW_MS: ${env:RATE_LIMIT_WINDOW_MS, '60000'}
    RATE_LIMIT_MAX: ${env:RATE_LIMIT_MAX, '10'}
    INTERNAL_TIMEOUT_MS: ${env:INTERNAL_TIMEOUT_MS, '5000'}
    RETRY_MAX_ATTEMPTS: ${env:RETRY_MAX_ATTEMPTS, '2'}
    RETRY_BASE_MS: ${env:RETRY_BASE_MS, '200'}
    SMTP_HOST: ${env:SMTP_HOST, ''}
    SMTP_KEY: ${env:SMTP_KEY, ''}
    SMTP_USER: ${env:SMTP_USER, 'apikey'}
    SMTP_FROM: ${env:SMTP_FROM, 'no-reply@example.com'}

functions:
  register:
    handler: src/handler.handler
    events:
      - httpApi:
          method: POST
          path: /api/register
    # API Gateway CORS(建议在网关层开启,函数中也返回CORS头以防万一)
  # 可选:自定义 IAM 权限(如访问DynamoDB/Secrets Manager时需要)
  # iamRoleStatements:
  #   - Effect: Allow
  #     Action:
  #       - dynamodb:PutItem
  #       - dynamodb:GetItem
  #     Resource: arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/YourTable
package:
  patterns:
    - '!**/*'
    - 'src/**'
    - 'package.json'
    - 'package-lock.json'
    - 'node_modules/**'
plugins: []

配置说明

  • 部署方式
    • 安装 Serverless Framework: npm i -g serverless
    • 安装依赖: npm i
    • 部署: npx serverless deploy
  • 环境变量
    • CORS_ORIGIN:允许的跨域来源,默认 *
    • RATE_LIMIT_WINDOW_MS / RATE_LIMIT_MAX:速率限制时间窗口与请求上限(容器级示例,仅演示)
    • INTERNAL_TIMEOUT_MS:内部单步操作超时毫秒(如存储、日志、邮件)
    • RETRY_MAX_ATTEMPTS / RETRY_BASE_MS:内部重试最大次数与指数退避基数
    • SMTP_HOST:SMTP 主机与可选端口(例如 smtp.example.com:587)
    • SMTP_KEY:SMTP 凭据(切勿硬编码)
    • SMTP_USER:SMTP 用户名(默认 apikey,按服务商调整)
    • SMTP_FROM:发件人邮箱
  • 路由
    • POST /api/register:请求体 { "email": "...", "password": "..." }
    • OPTIONS /api/register:CORS 预检
  • 依赖
    • nodemailer:用于SMTP发送(若不安装,会降级为日志占位)

使用指南

  • 本地测试
    • 使用 serverless-offline 插件可本地运行(可选)
    • 或通过 AWS 控制台的“测试”触发,构造 HTTP API 事件格式
  • 请求示例
    • Headers:
      • Content-Type: application/json
      • X-Correlation-Id: 可选,用于链路追踪
    • Body: { "email": "user@example.com", "password": "Str0ng!Pass" }
  • 响应
    • 201:{ "userId": "...", "message": "User created" }
    • 400:校验失败(包含 details)
    • 409:邮箱已存在
    • 429:命中速率限制(带 Retry-After)
    • 500:内部错误(不暴露细节)
  • 邮件发送
    • 配置 SMTP_HOST/SMTP_KEY(以及必要时 SMTP_USER/SMTP_FROM)
    • 未配置或未安装 nodemailer 时,将仅记录验证码日志(占位)
  • 超时与重试
    • Lambda 层超时由 serverless.yml 的 provider.timeout 控制
    • 业务内部调用使用 withTimeout + retryAsync,避免长时间阻塞

最佳实践建议

  • 速率限制
    • 代码内的 Map 仅对单个冷/热容器有效,不适合水平扩展。生产环境应在 API Gateway/WAF/CloudFront 层配置速率限制或使用全局存储(如 Redis/DynamoDB)计数。
  • 凭据管理
    • 使用 AWS Secrets Manager/SSM Parameter Store 管理 SMTP_KEY,避免明文环境变量;或通过 Lambda 环境变量加密+KMS。
  • 存储层
    • 替换内存存储为持久化存储并实现唯一约束(如 DynamoDB 使用 email 作为PK;写入时条件表达式 attribute_not_exists(email) 防止重复)。
    • 对审计日志使用独立表或流系统(Kinesis/Firehose)以解耦主流程。
  • 密码安全
    • 已使用 scrypt 示例。生产配置应明确参数(N、r、p)并进行密钥拉伸,或采用经过审计的库(argon2/bcrypt),并确保密码不出现在日志中。
  • 结构化日志
    • 建议使用 AWS Lambda Powertools for TypeScript/Node.js 的 Logger/Tracing/Metric 组件,统一观测与追踪。
  • CORS
    • 统一在 API Gateway 层开启并限制到具体前端域名;函数仍返回CORS以增加兼容性。
  • 错误处理
    • 避免在 500 响应中暴露内部错误细节;在日志中记录 requestId 以便故障排查。
  • 冷启动与性能
    • 避免在顶层引入大型依赖;按需懒加载(如本模板对 nodemailer 的处理)。
    • 结合 Lambda 容量与超时设置,控制依赖体积,使用打包器(esbuild/webpack)裁剪。
  • 重试策略
    • HTTP 同步触发不会自动重试。对于需要保证达成的任务(如审计/邮件),建议引入异步事件/队列(EventBridge/SQS)并在下游重试。

以上模板可直接用于 AWS Lambda + API Gateway HTTP API 的注册端点,并提供校验、日志、错误、CORS、速率限制、超时与重试以及邮件发送的完整演示骨架。根据业务需要替换占位存储和邮件实现后即可投入使用。

函数模板概述

  • 平台类型:AWS Lambda(SQS 触发,S3 归档,Redis 做幂等)
  • 运行时环境:Python 3.9
  • 触发类型:消息队列(SQS,支持批处理与并发度配置,DLQ)

核心代码

# app.py
# -*- coding: utf-8 -*-
import json
import os
import time
import uuid
import logging
import traceback
from datetime import datetime, timezone
from typing import Any, Dict, List

import boto3
from botocore.exceptions import ClientError

try:
    import redis  # pip install redis
except Exception:
    redis = None  # 允许本地/冷启动缺依赖,部署时请确保打包依赖

# -------------------------
# 环境变量
# -------------------------
ARCHIVE_BUCKET = os.getenv("ARCHIVE_BUCKET", "")
REDIS_URL = os.getenv("REDIS_URL", "")
ALERT_WEBHOOK = os.getenv("ALERT_WEBHOOK", "")  # 可选
# 幂等已处理标记 TTL(秒)
PROCESSED_TTL_SECONDS = int(os.getenv("PROCESSED_TTL_SECONDS", "604800"))  # 默认7天
# 处理锁 TTL(秒)
LOCK_TTL_SECONDS = int(os.getenv("LOCK_TTL_SECONDS", "300"))  # 5分钟
# CloudWatch 指标命名空间
METRIC_NAMESPACE = os.getenv("METRIC_NAMESPACE", "OrderProcessor")
SERVICE_NAME = os.getenv("SERVICE_NAME", "order-created-consumer")

# -------------------------
# 全局客户端(复用连接)
# -------------------------
s3 = boto3.client("s3")
_lambda = boto3.client("lambda")  # 仅用于获取函数名等信息(可选)
_redis = None

logger = logging.getLogger()
logger.setLevel(logging.INFO)


# -------------------------
# 异常分类
# -------------------------
class TransientError(Exception):
    """可重试异常(网络波动、服务限流/暂时不可用等)"""
    pass


class PermanentError(Exception):
    """不可重试异常(数据校验失败、业务规则不满足等)"""
    pass


# -------------------------
# 工具函数
# -------------------------
def now_utc_iso() -> str:
    return datetime.now(timezone.utc).isoformat()


def parse_iso_date(s: str) -> datetime:
    # 尽量兼容 "2024-01-01T00:00:00Z" / "+00:00"
    if s.endswith("Z"):
        s = s[:-1] + "+00:00"
    return datetime.fromisoformat(s)


def get_redis() -> "redis.Redis":
    global _redis
    if _redis is None:
        if not REDIS_URL:
            raise PermanentError("Missing REDIS_URL env")
        if redis is None:
            raise PermanentError("redis library not available; bundle dependency")
        _redis = redis.Redis.from_url(
            REDIS_URL,
            socket_timeout=2,
            socket_connect_timeout=2,
            health_check_interval=30,
            decode_responses=True,
        )
    return _redis


def is_retryable_aws_error(err: ClientError) -> bool:
    code = err.response.get("Error", {}).get("Code", "")
    # 常见可重试错误码
    retryable = {
        "RequestTimeout", "Throttling", "ThrottlingException", "SlowDown",
        "InternalError", "InternalServerError", "ServiceUnavailable",
        "ProvisionedThroughputExceededException"
    }
    if code in retryable:
        return True
    # 5xx 一般可重试
    try:
        return code.isdigit() and int(code) >= 500
    except Exception:
        return False


def safe_json_dumps(obj: Any) -> str:
    return json.dumps(obj, ensure_ascii=False, separators=(",", ":"))


def emit_emf_metrics(dimensions: Dict[str, str], metrics: Dict[str, float]):
    # CloudWatch Embedded Metric Format(EMF)
    # 参考:https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/CloudWatch_Embedded_Metric_Format.html
    metric_names = list(metrics.keys())
    emf = {
        "_aws": {
            "Timestamp": int(time.time() * 1000),
            "CloudWatchMetrics": [
                {
                    "Namespace": METRIC_NAMESPACE,
                    "Dimensions": [list(dimensions.keys())],
                    "Metrics": [{"Name": n, "Unit": "Count" if n.endswith("_count") else "Milliseconds"} for n in metric_names],
                }
            ],
        }
    }
    emf.update(dimensions)
    emf.update(metrics)
    print(safe_json_dumps(emf))


def send_alert(title: str, message: str, context: Dict[str, Any] = None):
    if not ALERT_WEBHOOK:
        return
    try:
        import urllib.request
        import urllib.error

        payload = {
            "title": title,
            "message": message,
            "service": SERVICE_NAME,
            "time": now_utc_iso(),
            "context": context or {},
        }
        data = safe_json_dumps(payload).encode("utf-8")
        req = urllib.request.Request(
            ALERT_WEBHOOK,
            data=data,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        with urllib.request.urlopen(req, timeout=2) as resp:
            resp.read()
    except Exception as e:
        logger.warning("alert_failed: %s", str(e))


def get_order_keys(order_id: str):
    return f"order:{order_id}:done", f"order:{order_id}:lock"


def acquire_lock(r: "redis.Redis", lock_key: str, token: str, ttl_sec: int) -> bool:
    # SET lock_key token NX EX ttl
    return bool(r.set(lock_key, token, nx=True, ex=ttl_sec))


def release_lock_if_owner(r: "redis.Redis", lock_key: str, token: str):
    # 原子释放锁(仅持有者可释放)
    lua = """
    if redis.call("GET", KEYS[1]) == ARGV[1] then
      return redis.call("DEL", KEYS[1])
    else
      return 0
    end
    """
    try:
        r.eval(lua, 1, lock_key, token)
    except Exception:
        pass


def parse_record_body(record: Dict[str, Any]) -> Dict[str, Any]:
    """
    支持:
    - 直接 SQS JSON: body = {"orderId": "...", ...}
    - SNS->SQS: body = {"Type":"Notification", "Message":"{...json...}"}
    """
    raw_body = record.get("body", "")
    try:
        body = json.loads(raw_body)
    except Exception:
        raise PermanentError("Message body is not valid JSON")

    # SNS wrap
    if isinstance(body, dict) and "Message" in body and "Type" in body:
        try:
            msg = json.loads(body["Message"])
        except Exception:
            raise PermanentError("SNS Message is not valid JSON")
        return msg
    elif isinstance(body, dict):
        return body
    else:
        raise PermanentError("Unsupported message body structure")


def validate_message(msg: Dict[str, Any]):
    required = ["orderId", "userId", "amount", "createdAt"]
    missing = [k for k in required if k not in msg]
    if missing:
        raise PermanentError(f"Missing fields: {missing}")

    # 基础类型与值校验
    if not isinstance(msg["orderId"], str) or not msg["orderId"]:
        raise PermanentError("orderId invalid")
    if not isinstance(msg["userId"], str) or not msg["userId"]:
        raise PermanentError("userId invalid")
    try:
        float(msg["amount"])
    except Exception:
        raise PermanentError("amount invalid")
    try:
        parse_iso_date(msg["createdAt"])
    except Exception:
        raise PermanentError("createdAt invalid ISO timestamp")


def archive_order_to_s3(msg: Dict[str, Any], s3_client, bucket: str, source_id: str):
    created_at = parse_iso_date(msg["createdAt"])
    y = created_at.strftime("%Y")
    m = created_at.strftime("%m")
    d = created_at.strftime("%d")
    order_id = msg["orderId"]

    # 归档 JSON
    archive_doc = {
        "orderId": msg["orderId"],
        "userId": msg["userId"],
        "amount": float(msg["amount"]),
        "createdAt": msg["createdAt"],
        "archivedAt": now_utc_iso(),
        "sourceMessageId": source_id,
        "status": "archived",
        "topic": "order.created",
        "version": 1,
    }

    key = f"order-created/y={y}/m={m}/d={d}/{order_id}.json"
    try:
        s3_client.put_object(
            Bucket=bucket,
            Key=key,
            Body=safe_json_dumps(archive_doc).encode("utf-8"),
            ContentType="application/json",
            CacheControl="no-store",
        )
    except ClientError as e:
        if is_retryable_aws_error(e):
            raise TransientError(f"S3 retryable error: {e}")
        raise PermanentError(f"S3 permanent error: {e}")
    return key


def process_one_record(record: Dict[str, Any], context) -> str:
    """
    返回状态:
    - "ok":成功归档或已处理
    - "skip":不可重试(已告警/隔离),当做成功确认消费
    - 抛出 TransientError:局部失败,触发重试
    """
    if not ARCHIVE_BUCKET:
        raise TransientError("ARCHIVE_BUCKET not configured")

    # 解析与校验
    msg = parse_record_body(record)
    validate_message(msg)

    order_id = msg["orderId"]
    message_id = record.get("messageId", str(uuid.uuid4()))
    approx_receive = int(record.get("attributes", {}).get("ApproximateReceiveCount", "1"))

    r = get_redis()
    processed_key, lock_key = get_order_keys(order_id)
    # 幂等:若已处理,直接返回成功
    if r.exists(processed_key):
        logger.info(safe_json_dumps({
            "event": "idempotent_hit",
            "orderId": order_id,
            "messageId": message_id
        }))
        return "ok"

    # 争抢处理锁,避免并发重复处理
    token = context.aws_request_id if getattr(context, "aws_request_id", None) else str(uuid.uuid4())
    if not acquire_lock(r, lock_key, token, LOCK_TTL_SECONDS):
        # 另一并发实例正在处理,返回可重试让其完成后再确认
        raise TransientError(f"processing_in_progress orderId={order_id}")

    try:
        # 归档到 S3
        key = archive_order_to_s3(msg, s3, ARCHIVE_BUCKET, message_id)

        # 标记已处理(幂等)
        r.set(processed_key, now_utc_iso(), ex=PROCESSED_TTL_SECONDS)

        logger.info(safe_json_dumps({
            "event": "archived",
            "orderId": order_id,
            "s3key": key,
            "messageId": message_id
        }))
        return "ok"
    except PermanentError as pe:
        # 不可重试:记录到错误前缀并告警,避免无谓重试
        try:
            quarantine_key = f"errors/order-created/{order_id}-{message_id}.json"
            s3.put_object(
                Bucket=ARCHIVE_BUCKET,
                Key=quarantine_key,
                Body=safe_json_dumps({
                    "error": str(pe),
                    "message": msg,
                    "archivedAt": now_utc_iso(),
                    "receiveCount": approx_receive,
                    "messageId": message_id
                }).encode("utf-8"),
                ContentType="application/json",
                CacheControl="no-store",
            )
        except Exception:
            # 隔离失败不影响主流程
            pass

        send_alert(
            title="[PermanentError] order.created",
            message=str(pe),
            context={"orderId": order_id, "messageId": message_id, "receiveCount": approx_receive},
        )
        logger.error(safe_json_dumps({
            "event": "permanent_error",
            "orderId": order_id,
            "messageId": message_id,
            "error": str(pe)
        }))
        # 不重试
        return "skip"
    finally:
        release_lock_if_owner(r, lock_key, token)


def handler(event, context):
    """
    SQS 触发的批处理 Lambda。
    - 支持局部失败(Partial Batch Response),仅失败项重试
    - 记录批处理耗时与计数指标(EMF)
    """
    batch_start = time.perf_counter()
    batch_received = len(event.get("Records", []))
    success_count = 0
    skip_count = 0
    transient_fail_count = 0
    permanent_fail_count = 0  # 被判定为永久失败但已跳过重试
    item_failures: List[Dict[str, str]] = []

    for record in event.get("Records", []):
        per_start = time.perf_counter()
        mid = record.get("messageId", "unknown")
        try:
            status = process_one_record(record, context)
            if status == "ok":
                success_count += 1
            elif status == "skip":
                skip_count += 1
        except TransientError as te:
            transient_fail_count += 1
            item_failures.append({"itemIdentifier": mid})
            logger.warning(safe_json_dumps({
                "event": "transient_error",
                "messageId": mid,
                "error": str(te)
            }))
            # 当即将进入 DLQ(如已达阈值)时发送告警
            try:
                approx_receive = int(record.get("attributes", {}).get("ApproximateReceiveCount", "1"))
            except Exception:
                approx_receive = 1
            if approx_receive >= 3:
                send_alert(
                    title="[RetryExhausted] order.created",
                    message=str(te),
                    context={"messageId": mid, "receiveCount": approx_receive},
                )
        except PermanentError as pe:
            permanent_fail_count += 1
            # 跳过重试,已在内部记录与隔离
        except Exception as e:
            # 未分类异常按可重试处理
            transient_fail_count += 1
            item_failures.append({"itemIdentifier": mid})
            logger.error(safe_json_dumps({
                "event": "unexpected_error",
                "messageId": mid,
                "error": str(e),
                "trace": traceback.format_exc()
            }))
        finally:
            per_dur_ms = (time.perf_counter() - per_start) * 1000.0
            print(safe_json_dumps({
                "event": "item_processed",
                "messageId": mid,
                "duration_ms": round(per_dur_ms, 2)
            }))

    batch_dur_ms = (time.perf_counter() - batch_start) * 1000.0

    # 输出 EMF 指标
    dims = {
        "service": SERVICE_NAME,
        "function": os.getenv("AWS_LAMBDA_FUNCTION_NAME", "unknown"),
        "stage": os.getenv("STAGE", "prod"),
    }
    emit_emf_metrics(
        dimensions=dims,
        metrics={
            "received_count": float(batch_received),
            "success_count": float(success_count),
            "skip_count": float(skip_count),
            "transient_fail_count": float(transient_fail_count),
            "permanent_fail_count": float(permanent_fail_count),
            "batch_duration_ms": round(batch_dur_ms, 2),
        },
    )

    # Partial Batch Response:仅重试失败项
    return {"batchItemFailures": item_failures}

requirements.txt

boto3==1.35.0
redis==4.6.0

配置说明

使用 AWS SAM 示例(可根据需要改为 CDK 或 Serverless Framework):

# template.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Order Created Consumer - SQS -> Lambda -> S3 with Redis Idempotency

Parameters:
  ArchiveBucketName:
    Type: String
    Description: S3 bucket name for archiving order summaries
  RedisUrl:
    Type: String
    Description: Redis connection URL (e.g., rediss://user:pass@host:6379/0)
  AlertWebhook:
    Type: String
    Default: ""
    Description: Optional alert webhook URL
  BatchSize:
    Type: Number
    Default: 10
    MinValue: 1
    MaxValue: 10
  MaxConcurrency:
    Type: Number
    Default: 10
    MinValue: 2
    MaxValue: 100
  ProcessedTtlSeconds:
    Type: Number
    Default: 604800
  LockTtlSeconds:
    Type: Number
    Default: 300

Resources:
  OrderCreatedDLQ:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: order-created-dlq

  OrderCreatedQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: order-created
      RedrivePolicy:
        deadLetterTargetArn: !GetAtt OrderCreatedDLQ.Arn
        maxReceiveCount: 3  # 失败重试最多3次,超出进入DLQ

  OrderCreatedConsumer:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: order-created-consumer
      Runtime: python3.9
      Handler: app.handler
      Timeout: 30
      MemorySize: 512
      CodeUri: ./
      Environment:
        Variables:
          ARCHIVE_BUCKET: !Ref ArchiveBucketName
          REDIS_URL: !Ref RedisUrl
          ALERT_WEBHOOK: !Ref AlertWebhook
          METRIC_NAMESPACE: OrderPipeline
          SERVICE_NAME: order-created-consumer
          PROCESSED_TTL_SECONDS: !Ref ProcessedTtlSeconds
          LOCK_TTL_SECONDS: !Ref LockTtlSeconds
      Policies:
        - S3WritePolicy:
            BucketName: !Ref ArchiveBucketName
        - CloudWatchLogsFullAccess
      Events:
        SQSTrigger:
          Type: SQS
          Properties:
            Queue: !GetAtt OrderCreatedQueue.Arn
            BatchSize: !Ref BatchSize
            FunctionResponseTypes:
              - ReportBatchItemFailures
            ScalingConfig:
              MaximumConcurrency: !Ref MaxConcurrency
      # 如需访问 VPC 内的 Redis,请在此配置 VpcConfig(子网、安组)
      # VpcConfig:
      #   SecurityGroupIds: [sg-xxxxxxxx]
      #   SubnetIds: [subnet-aaaa, subnet-bbbb]

Outputs:
  QueueUrl:
    Value: !Ref OrderCreatedQueue
  DLQUrl:
    Value: !Ref OrderCreatedDLQ

说明:

  • 将外部事件源(如 SNS、Kafka、EventBridge)通过订阅/连接推送至 SQS 的 order-created 队列;Lambda 直接由 SQS 触发,从而具备批处理、并发度控制、DLQ 重投等能力。
  • RedrivePolicy 配置了 maxReceiveCount=3,与“失败重试最多3次,仍失败路由至 DLQ”一致。
  • FunctionResponseTypes=ReportBatchItemFailures 启用局部失败返回,保证仅失败消息重试。

使用指南

  • 打包与部署
    • 将 app.py 与 requirements.txt 放入同一目录,使用 SAM 打包部署:
      • sam build --use-container
      • sam deploy --guided
    • 确保部署时提供参数 ArchiveBucketName、RedisUrl(建议使用 Secrets Manager + Lambda 环境变量引用,不要硬编码凭据)。
  • 消息格式
    • 直接 SQS 消息 body 或 SNS->SQS 包装均可,最终消息 JSON 必须包含:
      • orderId: string
      • userId: string
      • amount: number
      • createdAt: ISO 时间戳(例如 2024-01-01T00:00:00Z)
  • 幂等处理
    • 以 orderId 为粒度,Redis 通过 processed_key 标记已处理,TTL 默认 7 天可配。
    • 并发争抢使用短期锁 lock_key;若有并发,当前消息将标记为可重试,避免重复归档。
  • 归档策略
    • 成功将订单摘要以 JSON 写入 S3:order-created/y=YYYY/m=MM/d=DD/{orderId}.json
    • 不可重试(PermanentError)消息将隔离到 errors/order-created/ 前缀,并发送告警。
  • 失败与重试
    • 可重试(TransientError)返回局部失败,仅该消息重试;超过 3 次将由 SQS 自动投递至 DLQ。
    • 不可重试错误不再重试(函数内“skip”处理),避免冲击系统;已记录到隔离区并告警。
  • 指标与日志
    • 每个批次输出 EMF 指标:received_count、success_count、skip_count、transient_fail_count、permanent_fail_count、batch_duration_ms。
    • 每条消息输出处理耗时(item_processed)。
    • 可在 CloudWatch Logs 中基于 EMF 自动生成指标图表和报警。

最佳实践建议

  • 安全与合规
    • 不在代码中硬编码凭据;使用 Secrets Manager 或 SSM Parameter Store 注入 Redis 凭据,结合 KMS 加密。
    • 若 Redis 在 VPC 内,配置最小权限安全组与子网隔离。
    • S3 桶开启默认加密(SSE-KMS),并配置最小权限的 IAM Policy。
  • 稳定性与性能
    • 合理设置 BatchSize(1~10)与 MaximumConcurrency,确保下游(S3/Redis)不被压垮;可结合 SQS 可见性超时进行容量规划。
    • 使用 Partial Batch Response 降低“全批失败”导致的放大重试。
    • 根据业务重复率调优 PROCESSED_TTL_SECONDS,平衡幂等缓存命中率与存储成本。
    • 将 Lambda 超时设置为略高于单批最大预期处理耗时。
  • 监控与告警
    • 基于 EMF 指标创建 CloudWatch Alarm:高 transient_fail_count、permanent_fail_count、DLQ 消息积压。
    • 对 DLQ 配置可观测管道(订阅 Lambda/告警)以便及时处置“坏消息”。
  • 可靠性
    • SQS RedrivePolicy 设置 maxReceiveCount=3 与需求一致;如存在上游抖动,建议从小到大逐步调优。
    • 对 S3 的 5xx/SlowDown 等错误进行重试分类(已内置),必要时可加入指数退避。
  • 可维护性
    • 将 schema 校验独立为模块,复杂校验可引入 pydantic/fastjsonschema(注意打包体积)。
    • 按需扩展归档对象字段与版本号,保持向后兼容。
    • 使用 CI/CD(如 GitHub Actions + SAM)进行自动化测试与部署。

如需改为 Azure Functions(Service Bus)或 GCP(Pub/Sub + Cloud Storage),我可在保持相同业务语义下输出对应平台模板。

函数模板概述

  • 平台类型:AWS Lambda(S3 事件触发)
  • 运行时环境:go1.x
  • 触发类型:对象存储(Amazon S3:ObjectCreated*,前缀 images/,后缀 jpg/jpeg/png)

核心代码

// 文件:main.go
// 功能:当 S3 bucket 的 images/ 前缀有 JPG/PNG 新文件上传时,生成 200x200 居中裁剪的 JPEG 缩略图(质量80),写回到 OUTPUT_PREFIX(默认 thumbnails/)同名路径;
// - 若目标已存在则跳过
// - 对非图片或超过 MAX_SIZE_MB 的文件记录告警并结束
// - 输出结构化日志和耗时指标
// 环境变量:
//   OUTPUT_PREFIX  (默认 "thumbnails/")
//   MAX_SIZE_MB    (默认 10)
//   CONCURRENCY    (默认 5)

package main

import (
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"image"
	"image/jpeg"
	_ "image/png" // 注册 PNG 解码
	"io"
	"net/url"
	"os"
	"path"
	"strings"
	"sync"
	"time"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambdacontext"
	"github.com/aws/aws-lambda-go/lambda"

	"github.com/aws/aws-sdk-go-v2/aws"
	"github.com/aws/aws-sdk-go-v2/config"
	s3 "github.com/aws/aws-sdk-go-v2/service/s3"
	s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
	smithy "github.com/aws/smithy-go"
	smithyhttp "github.com/aws/smithy-go/transport/http"

	"github.com/disintegration/imaging"
	"github.com/rwcarlsen/goexif/exif"
)

const (
	sourcePrefix = "images/"
	thumbSize    = 200
	jpgQuality   = 80
)

type AppConfig struct {
	OutputPrefix string
	MaxSizeBytes int64
	Concurrency  int
}

var (
	s3cl  *s3.Client
	appcf AppConfig
)

func init() {
	// 初始化 SDK 客户端(复用连接)
	ctx := context.Background()
	cfg, err := config.LoadDefaultConfig(ctx)
	if err != nil {
		panic(fmt.Errorf("load aws config: %w", err))
	}
	s3cl = s3.NewFromConfig(cfg)
	appcf = loadConfig()
}

func main() {
	lambda.Start(handler)
}

func handler(ctx context.Context, evt events.S3Event) error {
	start := time.Now()
	reqID := requestIDFromCtx(ctx)

	logJSON("info", "invocation_start", map[string]any{
		"request_id": reqID,
		"records":    len(evt.Records),
		"config": map[string]any{
			"output_prefix": appcf.OutputPrefix,
			"max_size_bytes": appcf.MaxSizeBytes,
			"concurrency":    appcf.Concurrency,
		},
	})

	sem := make(chan struct{}, appcf.Concurrency)
	var wg sync.WaitGroup
	errs := make(chan error, len(evt.Records))

	for _, r := range evt.Records {
		rec := r // 避免闭包变量捕获问题
		wg.Add(1)
		sem <- struct{}{}
		go func() {
			defer func() {
				<-sem
				wg.Done()
			}()
			if err := processRecord(ctx, rec); err != nil {
				errs <- err
			}
		}()
	}

	wg.Wait()
	close(errs)

	var firstErr error
	for e := range errs {
		if firstErr == nil {
			firstErr = e
		}
		// 继续收集但仅返回第一个错误(可按需修改为汇总)
	}

	logJSON("info", "invocation_end", map[string]any{
		"request_id": reqID,
		"duration_ms": time.Since(start).Milliseconds(),
		"error":       errToStr(firstErr),
	})
	return firstErr
}

func processRecord(ctx context.Context, rec events.S3EventRecord) error {
	start := time.Now()

	bucket := rec.S3.Bucket.Name
	keyEnc := rec.S3.Object.Key
	key, _ := url.QueryUnescape(keyEnc)
	key = strings.ReplaceAll(key, "+", " ") // 适配某些编码形式

	ext := strings.ToLower(path.Ext(key))
	isAllowed := ext == ".jpg" || ext == ".jpeg" || ext == ".png"

	// 仅处理 images/ 前缀
	if !strings.HasPrefix(key, sourcePrefix) {
		logJSON("debug", "skip_non_matching_prefix", map[string]any{
			"bucket": bucket, "key": key,
		})
		return nil
	}
	if !isAllowed {
		logJSON("warn", "skip_non_image_extension", map[string]any{
			"bucket": bucket, "key": key, "ext": ext,
		})
		return nil
	}

	// 目标 Key 计算
	suffix := strings.TrimPrefix(key, sourcePrefix)
	dstKey := path.Clean(appcf.OutputPrefix + suffix)

	// 若目标已存在则跳过
	exists, err := objectExists(ctx, bucket, dstKey)
	if err != nil {
		logJSON("error", "head_destination_failed", map[string]any{
			"bucket": bucket, "key": dstKey, "error": err.Error(),
		})
		return err
	}
	if exists {
		logJSON("info", "skip_already_exists", map[string]any{
			"bucket": bucket, "src_key": key, "dst_key": dstKey,
		})
		return nil
	}

	// 源对象大小检查(先 HEAD 再 GET,避免读大对象)
	head, err := s3cl.HeadObject(ctx, &s3.HeadObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		if isNotFound(err) {
			logJSON("warn", "source_not_found", map[string]any{
				"bucket": bucket, "key": key,
			})
			return nil
		}
		return fmt.Errorf("head source: %w", err)
	}
	srcSize := int64(0)
	if head.ContentLength != nil {
		srcSize = *head.ContentLength
	}
	if appcf.MaxSizeBytes > 0 && srcSize > appcf.MaxSizeBytes {
		logJSON("warn", "skip_exceed_max_size", map[string]any{
			"bucket": bucket, "key": key, "size_bytes": srcSize, "max_size_bytes": appcf.MaxSizeBytes,
		})
		return nil
	}

	// 读取对象(再限流一次,防止 HEAD 与实际不一致)
	get, err := s3cl.GetObject(ctx, &s3.GetObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err != nil {
		if isNotFound(err) {
			logJSON("warn", "source_not_found_get", map[string]any{
				"bucket": bucket, "key": key,
			})
			return nil
		}
		return fmt.Errorf("get source: %w", err)
	}
	defer get.Body.Close()

	maxBytes := appcf.MaxSizeBytes
	if maxBytes <= 0 {
		maxBytes = 10 << 20 // 安全兜底:10MB
	}
	var buf bytes.Buffer
	lr := &io.LimitedReader{R: get.Body, N: maxBytes + 1}
	n, err := io.Copy(&buf, lr)
	if err != nil {
		return fmt.Errorf("read source: %w", err)
	}
	if n > maxBytes {
		logJSON("warn", "skip_exceed_max_size_stream", map[string]any{
			"bucket": bucket, "key": key, "read_bytes": n, "max_size_bytes": maxBytes,
		})
		return nil
	}

	// 解码并尊重 EXIF 方向
	// 说明:对 JPEG 读取 EXIF Orientation 并将图像内容转换为正确方向(最终 JPEG 不保证完整保留所有 EXIF,避免磁盘/内存开销)
	srcImg, format, err := image.Decode(bytes.NewReader(buf.Bytes()))
	if err != nil {
		logJSON("warn", "decode_failed_not_image", map[string]any{
			"bucket": bucket, "key": key, "error": err.Error(),
		})
		return nil // 非图片:记录告警并结束
	}

	if format == "jpeg" {
		if ex, err := exif.Decode(bytes.NewReader(buf.Bytes())); err == nil && ex != nil {
			srcImg = applyEXIFOrientation(srcImg, ex)
		}
	}

	// 生成 200x200 居中裁剪缩略图(Lanczos,质量80)
	thumb := imaging.Fill(srcImg, thumbSize, thumbSize, imaging.Center, imaging.Lanczos)

	var out bytes.Buffer
	if err := jpeg.Encode(&out, thumb, &jpeg.Options{Quality: jpgQuality}); err != nil {
		return fmt.Errorf("encode jpeg: %w", err)
	}

	// 写回 S3(ContentType: image/jpeg)
	_, err = s3cl.PutObject(ctx, &s3.PutObjectInput{
		Bucket:      aws.String(bucket),
		Key:         aws.String(dstKey),
		Body:        bytes.NewReader(out.Bytes()),
		ContentType: aws.String("image/jpeg"),
		// 如需加密可启用(示例:SSE-S3)
		// ServerSideEncryption: s3types.ServerSideEncryptionAes256,
		Metadata: map[string]string{
			"x-source-key": key,
		},
	})
	if err != nil {
		return fmt.Errorf("put destination: %w", err)
	}

	logJSON("info", "thumb_generated", map[string]any{
		"bucket":        bucket,
		"src_key":       key,
		"dst_key":       dstKey,
		"src_bytes":     n,
		"dst_bytes":     out.Len(),
		"duration_ms":   time.Since(start).Milliseconds(),
		"format":        format,
		"jpeg_quality":  jpgQuality,
		"thumb_size_px": fmt.Sprintf("%dx%d", thumbSize, thumbSize),
	})

	return nil
}

func objectExists(ctx context.Context, bucket, key string) (bool, error) {
	_, err := s3cl.HeadObject(ctx, &s3.HeadObjectInput{
		Bucket: aws.String(bucket),
		Key:    aws.String(key),
	})
	if err == nil {
		return true, nil
	}
	if isNotFound(err) {
		return false, nil
	}
	return false, err
}

func isNotFound(err error) bool {
	// 兼容多种 404/NoSuchKey
	var re *smithyhttp.ResponseError
	if errors.As(err, &re) {
		if re.HTTPStatusCode() == 404 {
			return true
		}
	}
	var apiErr smithy.APIError
	if errors.As(err, &apiErr) {
		code := apiErr.ErrorCode()
		if code == "NotFound" || code == "NoSuchKey" {
			return true
		}
	}
	var nfe *s3types.NotFound
	if errors.As(err, &nfe) {
		return true
	}
	return false
}

func loadConfig() AppConfig {
	maxMB := parseInt64(getEnv("MAX_SIZE_MB", "10"))
	con := parseInt(getEnv("CONCURRENCY", "5"))
	prefix := getEnv("OUTPUT_PREFIX", "thumbnails/")

	// 规范化前缀,确保不以 "/" 开头,以 "/" 结尾
	prefix = strings.TrimPrefix(prefix, "/")
	if !strings.HasSuffix(prefix, "/") {
		prefix += "/"
	}
	if prefix == sourcePrefix {
		// 防止回写触发环路
		prefix = "thumbnails/"
	}

	return AppConfig{
		OutputPrefix: prefix,
		MaxSizeBytes: maxMB << 20,
		Concurrency:  con,
	}
}

func getEnv(k, def string) string {
	v := strings.TrimSpace(os.Getenv(k))
	if v == "" {
		return def
	}
	return v
}
func parseInt(s string) int {
	n := 0
	fmt.Sscanf(s, "%d", &n)
	if n <= 0 {
		n = 1
	}
	return n
}
func parseInt64(s string) int64 {
	var n int64
	fmt.Sscanf(s, "%d", &n)
	if n <= 0 {
		n = 1
	}
	return n
}

func applyEXIFOrientation(img image.Image, ex *exif.Exif) image.Image {
	tag, err := ex.Get(exif.Orientation)
	if err != nil || tag == nil {
		return img
	}
	orient, err := tag.Int(0)
	if err != nil {
		return img
	}
	switch orient {
	case 1:
		// 正常
		return img
	case 2:
		// 水平镜像
		return imaging.FlipH(img)
	case 3:
		// 180
		return imaging.Rotate180(img)
	case 4:
		// 垂直镜像
		return imaging.FlipV(img)
	case 5:
		// 水平镜像 + 270 CW
		return imaging.Rotate270(imaging.FlipH(img))
	case 6:
		// 90 CW
		return imaging.Rotate270(imaging.Rotate180(img)) // 等效为 Rotate90,但 imaging 没有直接 Rotate90? 有,使用 Rotate90 更直观:
		// return imaging.Rotate90(img)
	case 7:
		// 水平镜像 + 90 CW
		return imaging.Rotate90(imaging.FlipH(img))
	case 8:
		// 270 CW
		return imaging.Rotate270(img)
	default:
		return img
	}
}

func requestIDFromCtx(ctx context.Context) string {
	if lc, ok := lambdacontext.FromContext(ctx); ok && lc != nil {
		return lc.AwsRequestID
	}
	return ""
}

func errToStr(e error) string {
	if e == nil {
		return ""
	}
	return e.Error()
}

func logJSON(level, msg string, fields map[string]any) {
	if fields == nil {
		fields = map[string]any{}
	}
	fields["level"] = level
	fields["msg"] = msg
	fields["ts"] = time.Now().Format(time.RFC3339Nano)
	b, _ := json.Marshal(fields)
	fmt.Println(string(b))
}

备注:

  • 上面 applyEXIFOrientation 中对 90/270 度可直接使用 imaging.Rotate90/Rotate270。若使用 Rotate90 更直观,可将 case 6 改为 imaging.Rotate90(img)。

配置说明

  • 触发配置(S3 -> Lambda 事件通知):
    • 事件类型:ObjectCreated:Put, Post, Copy, CompleteMultipartUpload
    • 前缀过滤:images/
    • 后缀过滤(建议):.jpg, .jpeg, .png
  • 环境变量:
    • OUTPUT_PREFIX:缩略图输出前缀(默认 thumbnails/,将写入 thumbnails/<原相对路径>)
    • MAX_SIZE_MB:允许处理的源文件最大大小(默认 10MB)
    • CONCURRENCY:单次事件内并发处理的记录数量(默认 5)
  • IAM 权限(最小权限示例,替换为实际 bucket 名称):
    • s3:GetObject, s3:HeadObject 对 arn:aws:s3:::YOUR_BUCKET/images/*
    • s3:PutObject, s3:HeadObject 对 arn:aws:s3:::YOUR_BUCKET/thumbnails/*
    • logs:CreateLogGroup, logs:CreateLogStream, logs:PutLogEvents(CloudWatch Logs)
  • 部署构建(Go 1.x runtime):
    • go.mod(示例):
      • module example.com/thumbnailer
      • require:
        • github.com/aws/aws-lambda-go v1.x
        • github.com/aws/aws-sdk-go-v2 v1.x
        • github.com/aws/aws-sdk-go-v2/config v1.x
        • github.com/aws/aws-sdk-go-v2/service/s3 v1.x
        • github.com/rwcarlsen/goexif v0.x
        • github.com/disintegration/imaging v1.x
    • 构建与打包:
      • 环境:GOOS=linux GOARCH=amd64(或 arm64 与对应 Lambda 架构匹配)
      • 命令:GOOS=linux GOARCH=amd64 go build -ldflags="-s -w" -o bootstrap .
      • 打包:zip function.zip bootstrap
      • 运行时:go1.x(如转为自定义运行时,可使用 provided.al2)
  • Lambda 基本配置建议:
    • 内存:512–1024 MB(图像处理需要一定 CPU,内存越高,CPU 配额越高,速度更快)
    • 超时:30 秒以上(视图片大小与并发调整)
    • 保持并发限制:使用函数级并发上限避免过载后端或 S3 请求阈值

使用指南

  1. 创建或选择 S3 Bucket,并启用事件通知:
    • 触发条件配置为 images/ 前缀、后缀 .jpg/.jpeg/.png
  2. 部署 Lambda:
    • 设置环境变量 OUTPUT_PREFIX=thumbnails/(或自定义)
    • 设置 MAX_SIZE_MB 和 CONCURRENCY
    • 赋予最小权限的执行角色
  3. 测试:
    • 向 s3:///images/ 目录上传 jpg/png 文件
    • 观察 CloudWatch Logs 的结构化日志与耗时指标
    • 验证 s3:///<OUTPUT_PREFIX>/ 同名相对路径生成了 200x200 JPEG 缩略图
  4. 行为说明:
    • 非 jpg/png 文件:记录 warn 日志并结束,不报错
    • 超过 MAX_SIZE_MB:记录 warn 日志并结束
    • 目标已存在:跳过并记录 info
    • 仅处理 images/ 前缀对象,避免对其他对象误处理

最佳实践建议

  • 性能优化
    • 增加 Lambda 内存以提升 CPU,图像处理将明显加速;在 512MB–1024MB 区间权衡成本与性能
    • 使用事件过滤(前缀/后缀)从源头减少无效触发
    • 设置合理 CONCURRENCY:经验值为 floor(可用内存 / (单图峰值内存×安全系数))。单图峰值内存近似为 原图大小×3~5(解码后像素内存 + 处理中间态)。例如:
      • 512MB 内存,单图峰值约 30MB,则建议并发 6~10;根据实际 CloudWatch 指标微调
    • 尽量复用 SDK 客户端(已在 init 中完成)
    • 使用 HeadObject 预检大小避免读取大对象
  • 稳定性与幂等
    • 目标对象存在即跳过,避免重复写入;如需更强幂等,可记录源对象 ETag 到目标元数据并在二次校验时对比
    • 建议在 S3 触发端增加 .jpg/.jpeg/.png 后缀过滤,减少非图片触发
  • 安全
    • 禁止硬编码凭证;使用 IAM 角色并授予最小权限
    • 需要合规时启用服务端加密(SSE-S3 或 SSE-KMS),并在 PutObject 设置加密参数
  • 监控与告警
    • 使用结构化日志便于创建 CloudWatch Logs Metric Filter:
      • msg=skip_exceed_max_size 计数作为告警阈值
      • msg=thumb_generated 统计成功率和平均耗时
    • 结合 Lambda 内置指标(Duration、Errors、Throttles、ConcurrentExecutions)进行容量规划
  • 方向与元数据
    • 代码已依据 EXIF Orientation 调整图像内容方向,保证显示正确;若需完整保留 EXIF 或复制源 EXIF,请引入专门 JPEG/EXIF 写入库并在编码后写回所需字段(注意额外内存/耗时开销)
  • 成本控制
    • 通过合理的 CONCURRENCY 和事件过滤减少无效处理
    • 若高并发场景下遇到 S3 限速,可加入指数退避重试或降低并发

如需移植到其他对象存储平台(如 GCS/Azure Blob/阿里云 OSS),仅需替换存储 SDK 及事件模型,核心图像处理与并发框架可原样复用。

示例详情

解决的问题

以最少输入,在几分钟内产出可直接落地的无服务器函数模板,显著缩短从想法到可运行样例的周期。

  • 根据“运行环境 + 触发方式 + 函数逻辑”一键生成高质量模板与配套说明
  • 自动补齐日志、错误处理、配置占位与性能建议,减少线上隐患与返工
  • 覆盖主流云平台与多种运行环境,降低多云迁移与协作成本
  • 适配事件处理、数据流水线、物联网接入、Web 后端等高频场景,快速搭建标准化骨架
  • 帮助个人开发者快速上手,帮助团队沉淀规范与复用资产,持续提升交付效率与一致性

适用用户

初创团队CTO/技术负责人

用它快速推出后端接口与事件处理,建立统一模板库,带新人在一周内搭起可用后端;压缩试错与云账单,聚焦核心业务验证。

云架构师/平台工程师

制定企业级无服务器规范与最佳实践,一键生成跨平台模板,统一日志与告警方案;用于治理多团队代码风格与成本。

全栈开发者/自由职业者

在多项目中快速交付第三方回调、表单处理与轻量接口;减少重复体力活,把精力放在业务逻辑与用户体验。

特征总结

按运行环境与触发类型,秒级生成规范化函数模板,大幅减少搭建时间。
自动补齐日志、错误处理与重试策略,并提供可追踪说明,降低线上故障风险。
一键适配多云平台与多语言运行时,免去重复迁移与改写,保持团队代码风格一致。
根据业务描述提炼输入输出流程,自动搭建清晰结构与注释,方便团队后续协作与扩展。
自带部署配置与使用指南,团队新人也能快速上手交付,减少口头传授与反复问答成本。
按需参数化模板,复用在事件处理、后端接口、数据流转等常见场景。
自动检查易踩坑点与弃用能力,提前规避风险,确保合规与长期可维护,持续降低维护成本。
结合性能建议与资源配额,默认给出更省时省费的实现,避免无谓膨胀与突发账单。
支持迭代完善逻辑,保留上下文,连续优化到可直接上线,同时减少跨人协作沟通摩擦。
可生成示例触发器与本地调试脚本,缩短联调与测试周期,提升交付质量与上线信心。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

无服务器函数模板生成器

6
1
Dec 30, 2025
本提示词专为开发者设计,能够根据指定的运行环境、触发类型和函数逻辑,快速生成符合最佳实践的无服务器函数模板代码。通过智能分析用户需求,自动构建可复用、可定制的代码框架,显著提升开发效率,减少重复编码工作,确保代码质量和一致性,支持多种云平台和运行环境的无缝集成。
成为会员,解锁全站资源
复制与查看不限次 · 持续更新权益
提示词宝典 · 终身会员

一次支付永久解锁,全站资源与持续更新;商业项目无限次使用

420 +
品类
8200 +
模板数量
17000 +
会员数量