热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据用户提供的功能需求与编程语言,快速生成高质量、结构清晰、可读性强的代码片段。支持自定义代码复杂度、输出形式及注释选项,适用于开发者在时间紧迫或需要标准化代码模板的场景下,高效完成功能模块开发。
{
"generated_code": "==================== Python (FastAPI) - 用户注册/登录与会话管理 API 处理器 + 示例 ====================\n# requirements.txt(参考)\n# fastapi==0.111.0\n# uvicorn==0.30.1\n# passlib[bcrypt]==1.7.4\n# python-jose==3.3.0\n# pydantic==2.8.2\n# pytest==8.3.2\n# httpx==0.27.0\n# python-json-logger==2.0.7\n\n# main.py\nfrom fastapi import FastAPI, Depends, Request\nfrom fastapi.responses import JSONResponse\nfrom fastapi.security import HTTPBearer, HTTPAuthorizationCredentials\nfrom pydantic import BaseModel, EmailStr, Field\nfrom typing import Optional, Dict, Any\nfrom datetime import datetime, timedelta, timezone\nfrom jose import jwt, JWTError\nfrom passlib.hash import bcrypt\nimport re\nimport uuid\nimport logging\nfrom pythonjsonlogger import jsonlogger\nimport time\n\n# ---------------------- 配置区 ----------------------\nJWT_SECRET = "dev_super_secret_change_me" # 在生产环境中使用安全的密钥(环境变量/密钥管理)\nJWT_ALG = "HS256"\nACCESS_TOKEN_EXPIRE_MIN = 15\nREFRESH_TOKEN_EXPIRE_DAYS = 7\n\n# 登录限流与锁定策略\nLOGIN_WINDOW_SECONDS = 15 * 60\nLOGIN_MAX_ATTEMPTS = 5\nLOCKOUT_SECONDS = 15 * 60\n\n# 通用速率限制(每IP,每路由)\nRATE_LIMIT_WINDOW_SECONDS = 60\nRATE_LIMIT_MAX = 60\n\n# 统一错误码常量\nERR_VALIDATION = "VALIDATION_ERROR"\nERR_AUTH_INVALID = "AUTH_INVALID_CREDENTIALS"\nERR_AUTH_LOCKED = "USER_LOCKED"\nERR_RATE_LIMIT = "RATE_LIMITED"\nERR_EMAIL_NOT_VERIFIED = "EMAIL_NOT_VERIFIED"\nERR_TOKEN_INVALID = "TOKEN_INVALID"\nERR_TOKEN_EXPIRED = "TOKEN_EXPIRED"\nERR_CONFLICT = "RESOURCE_CONFLICT"\nERR_NOT_FOUND = "NOT_FOUND"\n\n# ---------------------- 结构化日志 ----------------------\nlogger = logging.getLogger("auth_api")\nlogger.setLevel(logging.INFO)\nhandler = logging.StreamHandler()\nformatter = jsonlogger.JsonFormatter("%(asctime)s %(levelname)s %(message)s")\nhandler.setFormatter(formatter)\nlogger.addHandler(handler)\n\n# ---------------------- 简易内存存储(演示用,生产应使用数据库/缓存) ----------------------\nusers: Dict[str, Dict[str, Any]] = {} # key: email -> {id, email, password_hash, is_verified, created_at}\nverification_tokens: Dict[str, Dict[str, Any]] = {} # token -> {email, exp}\nreset_tokens: Dict[str, Dict[str, Any]] = {} # token -> {email, exp}\nrefresh_allowlist: Dict[str, Dict[str, Any]] = {} # jti -> {email, exp}\n\n# 登录失败与锁定状态: email -> {count, first_ts, locked_until}\nlogin_attempts: Dict[str, Dict[str, float]] = {}\n\n# 简单的每IP每路由速率限制计数: (ip, route) -> {count, first_ts}\nrate_limit_state: Dict[str, Dict[str, float]] = {}\n\n# ---------------------- 工具函数 ----------------------\ndef error_response(code: str, message: str, http_status: int = 400, details: Optional[dict] = None):\n payload = {"error": {"code": code, "message": message}}\n if details:\n payload["error"]["details"] = details\n return JSONResponse(status_code=http_status, content=payload)\n\n# 密码强度检验:至少8位,包含大写、小写、数字、特殊符号\ndef is_strong_password(pw: str) -> bool:\n if len(pw) < 8:\n return False\n patterns = [r"[A-Z]", r"[a-z]", r"\d", r"[^A-Za-z0-9]"]\n return all(re.search(p, pw) for p in patterns)\n\n# 发送邮件(演示用:写日志代替)\ndef send_email(to: str, subject: str, body: str):\n logger.info({"event": "send_email", "to": to, "subject": subject, "body": body})\n\n# JWT 生成与验证\ndef create_access_token(sub: str) -> str:\n now = datetime.now(timezone.utc)\n exp = now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MIN)\n payload = {"sub": sub, "exp": int(exp.timestamp()), "iat": int(now.timestamp()), "type": "access"}\n return jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)\n\ndef create_refresh_token(sub: str) -> str:\n now = datetime.now(timezone.utc)\n exp = now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS)\n jti = str(uuid.uuid4())\n payload = {"sub": sub, "exp": int(exp.timestamp()), "iat": int(now.timestamp()), "type": "refresh", "jti": jti}\n token = jwt.encode(payload, JWT_SECRET, algorithm=JWT_ALG)\n refresh_allowlist[jti] = {"email": sub, "exp": exp.timestamp()}\n return token\n\n# 校验与解析 JWT(返回payload 或错误)\ndef decode_token(token: str) -> Dict[str, Any]:\n try:\n payload = jwt.decode(token, JWT_SECRET, algorithms=[JWT_ALG])\n return payload\n except JWTError as e:\n raise ValueError(str(e))\n\n# 刷新令牌轮换:旧 jti 失效\ndef rotate_refresh_token(old_token: str) -> Optional[str]:\n try:\n payload = decode_token(old_token)\n except ValueError:\n return None\n if payload.get("type") != "refresh":\n return None\n jti = payload.get("jti")\n if not jti or jti not in refresh_allowlist:\n return None\n # 检查是否过期\n if refresh_allowlist[jti]["exp"] < time.time():\n refresh_allowlist.pop(jti, None)\n return None\n email = payload.get("sub")\n # 使旧的失效\n refresh_allowlist.pop(jti, None)\n # 创建新的\n return create_refresh_token(email)\n\n# 简单速率限制(每IP+路由)\ndef check_rate_limit(ip: str, route: str) -> bool:\n key = f"{ip}:{route}"\n now = time.time()\n state = rate_limit_state.get(key)\n if not state or now - state["first_ts"] > RATE_LIMIT_WINDOW_SECONDS:\n rate_limit_state[key] = {"count": 1, "first_ts": now}\n return True\n if state["count"] >= RATE_LIMIT_MAX:\n return False\n state["count"] += 1\n return True\n\n# 登录失败计数与锁定\ndef register_login_failure(email: str):\n now = time.time()\n st = login_attempts.get(email)\n if not st or now - st.get("first_ts", now) > LOGIN_WINDOW_SECONDS:\n login_attempts[email] = {"count": 1, "first_ts": now, "locked_until": 0}\n else:\n st["count"] += 1\n if st["count"] >= LOGIN_MAX_ATTEMPTS:\n st["locked_until"] = now + LOCKOUT_SECONDS\n\n\ndef is_locked(email: str) -> bool:\n st = login_attempts.get(email)\n if not st:\n return False\n if st.get("locked_until", 0) > time.time():\n return True\n return False\n\n\ndef reset_login_counter(email: str):\n if email in login_attempts:\n login_attempts[email] = {"count": 0, "first_ts": time.time(), "locked_until": 0}\n\n# ---------------------- Pydantic 模型 ----------------------\nclass RegisterRequest(BaseModel):\n email: EmailStr\n password: str = Field(min_length=8)\n\nclass LoginRequest(BaseModel):\n email: EmailStr\n password: str\n\nclass TokenResponse(BaseModel):\n token_type: str = "Bearer"\n access_token: str\n access_expires_in: int\n refresh_token: str\n refresh_expires_in: int\n\nclass ForgotPasswordRequest(BaseModel):\n email: EmailStr\n\nclass ResetPasswordRequest(BaseModel):\n token: str\n new_password: str = Field(min_length=8)\n\nclass UserResponse(BaseModel):\n id: str\n email: EmailStr\n is_verified: bool\n\nsecurity = HTTPBearer(auto_error=False)\n\n# ---------------------- FastAPI 应用与中间件 ----------------------\napp = FastAPI(title="Auth API", version="1.0.0")\n\n@app.middleware("http")\nasync def logging_middleware(request: Request, call_next):\n # 简单结构化日志,将请求方法、路径与客户端IP记录\n ip = request.client.host if request.client else "-"\n route = request.url.path\n if not check_rate_limit(ip, route):\n return error_response(ERR_RATE_LIMIT, "Too many requests", 429)\n start = time.time()\n response = await call_next(request)\n duration = int((time.time() - start) * 1000)\n logger.info({\n "event": "request",\n "method": request.method,\n "path": route,\n "status": response.status_code,\n "ip": ip,\n "duration_ms": duration\n })\n return response\n\n# ---------------------- 依赖项:认证 ----------------------\ndef get_current_user(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[Dict[str, Any]]:\n if not credentials:\n return None\n token = credentials.credentials\n try:\n payload = decode_token(token)\n if payload.get("type") != "access":\n return None\n email = payload.get("sub")\n user = users.get(email)\n return user\n except Exception:\n return None\n\n# ---------------------- 路由实现 ----------------------\n@app.post("/auth/register")\nasync def register(req: RegisterRequest):\n email = req.email.lower()\n if email in users:\n return error_response(ERR_CONFLICT, "Email already registered", 409)\n if not is_strong_password(req.password):\n return error_response(ERR_VALIDATION, "Password too weak: require upper, lower, digit and symbol")\n user_id = str(uuid.uuid4())\n users[email] = {\n "id": user_id,\n "email": email,\n "password_hash": bcrypt.hash(req.password),\n "is_verified": False,\n "created_at": datetime.now(timezone.utc).isoformat()\n }\n # 生成验证邮件\n token = str(uuid.uuid4())\n verification_tokens[token] = {"email": email, "exp": (datetime.now(timezone.utc) + timedelta(hours=24)).timestamp()}\n verification_link = f"https://example.com/verify?token={token}" # 实际应使用后端GET /auth/verify\n send_email(email, "Verify your account", f"Click to verify: {verification_link}")\n return {"message": "Registered. Please verify your email."}\n\n@app.get("/auth/verify")\nasync def verify_email(token: str):\n data = verification_tokens.get(token)\n if not data:\n return error_response(ERR_TOKEN_INVALID, "Invalid verification token", 400)\n if data["exp"] < time.time():\n verification_tokens.pop(token, None)\n return error_response(ERR_TOKEN_EXPIRED, "Verification token expired", 400)\n email = data["email"]\n user = users.get(email)\n if not user:\n return error_response(ERR_NOT_FOUND, "User not found", 404)\n user["is_verified"] = True\n verification_tokens.pop(token, None)\n return {"message": "Email verified"}\n\n@app.post("/auth/login", response_model=TokenResponse)\nasync def login(req: LoginRequest):\n email = req.email.lower()\n if is_locked(email):\n return error_response(ERR_AUTH_LOCKED, "Account locked due to repeated failures. Try later.", 423)\n user = users.get(email)\n if not user or not bcrypt.verify(req.password, user["password_hash"]):\n register_login_failure(email)\n return error_response(ERR_AUTH_INVALID, "Invalid email or password", 401)\n if not user.get("is_verified"):\n return error_response(ERR_EMAIL_NOT_VERIFIED, "Email not verified", 403)\n reset_login_counter(email)\n access = create_access_token(email)\n refresh = create_refresh_token(email)\n return TokenResponse(\n access_token=access,\n access_expires_in=ACCESS_TOKEN_EXPIRE_MIN * 60,\n refresh_token=refresh,\n refresh_expires_in=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 3600,\n )\n\n@app.post("/auth/token/refresh", response_model=TokenResponse)\nasync def refresh_token(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)):\n if not credentials:\n return error_response(ERR_TOKEN_INVALID, "Missing refresh token", 401)\n token = credentials.credentials\n new_refresh = rotate_refresh_token(token)\n if not new_refresh:\n return error_response(ERR_TOKEN_INVALID, "Invalid or expired refresh token", 401)\n try:\n payload = decode_token(new_refresh)\n except Exception:\n return error_response(ERR_TOKEN_INVALID, "Invalid refresh token", 401)\n email = payload.get("sub")\n access = create_access_token(email)\n return TokenResponse(\n access_token=access,\n access_expires_in=ACCESS_TOKEN_EXPIRE_MIN * 60,\n refresh_token=new_refresh,\n refresh_expires_in=REFRESH_TOKEN_EXPIRE_DAYS * 24 * 3600,\n )\n\n@app.post("/auth/password/forgot")\nasync def forgot_password(req: ForgotPasswordRequest):\n email = req.email.lower()\n # 不暴露是否存在该邮箱,避免用户枚举。这里仍发送 200,但仅当存在时发邮件\n user = users.get(email)\n if user:\n token = str(uuid.uuid4())\n reset_tokens[token] = {"email": email, "exp": (datetime.now(timezone.utc) + timedelta(hours=1)).timestamp()}\n link = f"https://example.com/reset?token={token}"\n send_email(email, "Reset your password", f"Click to reset: {link}")\n return {"message": "If the email exists, a reset link has been sent."}\n\n@app.post("/auth/password/reset")\nasync def reset_password(req: ResetPasswordRequest):\n data = reset_tokens.get(req.token)\n if not data:\n return error_response(ERR_TOKEN_INVALID, "Invalid reset token", 400)\n if data["exp"] < time.time():\n reset_tokens.pop(req.token, None)\n return error_response(ERR_TOKEN_EXPIRED, "Reset token expired", 400)\n if not is_strong_password(req.new_password):\n return error_response(ERR_VALIDATION, "Password too weak: require upper, lower, digit and symbol")\n email = data["email"]\n user = users.get(email)\n if not user:\n return error_response(ERR_NOT_FOUND, "User not found", 404)\n user["password_hash"] = bcrypt.hash(req.new_password)\n reset_tokens.pop(req.token, None)\n return {"message": "Password updated"}\n\n@app.get("/me", response_model=UserResponse)\nasync def me(current_user: Optional[Dict[str, Any]] = Depends(get_current_user)):\n if not current_user:\n return error_response(ERR_TOKEN_INVALID, "Invalid or missing access token", 401)\n return UserResponse(id=current_user["id"], email=current_user["email"], is_verified=current_user["is_verified"])\n\n# ---------------------- 启动(开发) ----------------------\n# 运行:uvicorn main:app --reload\n\n# ---------------------- 示例(curl) ----------------------\n# 1) 注册:\n# curl -X POST http://localhost:8000/auth/register -H "Content-Type: application/json" -d '{"email":"a@b.com","password":"Aa1!aaaa"}'\n# 2) 收到日志中的 verify 链接后,访问:\n# curl "http://localhost:8000/auth/verify?token=https://example.com/verify?token=${token}; // 实际使用 GET /auth/verify\n sendEmail(key, 'Verify your account', Click to verify: ${link});\n return res.json({ message: 'Registered. Please verify your email.' });\n});\n\napp.get('/auth/verify', (req, res) => {\n const token = String(req.query.token || '');\n const data = verificationTokens.get(token);\n if (!data) return error(res, ERR.TOKEN_INVALID, 'Invalid verification token');\n if (data.exp < Math.floor(Date.now() / 1000)) {\n verificationTokens.delete(token);\n return error(res, ERR.TOKEN_EXPIRED, 'Verification token expired');\n }\n const user = users.get(data.email);\n if (!user) return error(res, ERR.NOT_FOUND, 'User not found', 404);\n user.isVerified = true;\n verificationTokens.delete(token);\n return res.json({ message: 'Email verified' });\n});\n\napp.post('/auth/login', (req, res) => {\n const parsed = LoginSchema.safeParse(req.body);\n if (!parsed.success) return error(res, ERR.VALIDATION, 'Invalid input', 400, parsed.error.flatten());\n const { email, password } = parsed.data;\n const key = email.toLowerCase();\n\n if (isLocked(key)) return error(res, ERR.AUTH_LOCKED, 'Account locked due to repeated failures. Try later.', 423);\n\n const user = users.get(key);\n if (!user || !bcrypt.compareSync(password, user.passwordHash)) {\n registerFailure(key);\n return error(res, ERR.AUTH_INVALID, 'Invalid email or password', 401);\n }\n if (!user.isVerified) return error(res, ERR.EMAIL_NOT_VERIFIED, 'Email not verified', 403);\n\n resetCounter(key);\n const access = signAccess(key);\n const refresh = signRefresh(key);\n return res.json({ token_type: 'Bearer', access_token: access, access_expires_in: ACCESS_EXPIRE_SEC, refresh_token: refresh, refresh_expires_in: REFRESH_EXPIRE_SEC });\n});\n\napp.post('/auth/token/refresh', (req, res) => {\n const h = req.headers['authorization'];\n if (!h) return error(res, ERR.TOKEN_INVALID, 'Missing refresh token', 401);\n const parts = h.split(' ');\n if (!(parts.length === 2 && parts[0] === 'Bearer')) return error(res, ERR.TOKEN_INVALID, 'Invalid Authorization header', 401);\n const old = parts[1];\n const rotated = rotateRefresh(old);\n if (!rotated.ok || !rotated.email || !rotated.token) return error(res, ERR.TOKEN_INVALID, 'Invalid or expired refresh token', 401);\n const access = signAccess(rotated.email);\n return res.json({ token_type: 'Bearer', access_token: access, access_expires_in: ACCESS_EXPIRE_SEC, refresh_token: rotated.token, refresh_expires_in: REFRESH_EXPIRE_SEC });\n});\n\napp.post('/auth/password/forgot', (req, res) => {\n const parsed = ForgotSchema.safeParse(req.body);\n if (!parsed.success) return error(res, ERR.VALIDATION, 'Invalid input', 400, parsed.error.flatten());\n const email = parsed.data.email.toLowerCase();\n if (users.has(email)) {\n const token = uuidv4();\n resetTokens.set(token, { email, exp: Math.floor(Date.now() / 1000) + 3600 });\n const link = https://example.com/reset?token=${token};\n sendEmail(email, 'Reset your password', Click to reset: ${link});\n }\n return res.json({ message: 'If the email exists, a reset link has been sent.' });\n});\n\napp.post('/auth/password/reset', (req, res) => {\n const parsed = ResetSchema.safeParse(req.body);\n if (!parsed.success) return error(res, ERR.VALIDATION, 'Invalid input', 400, parsed.error.flatten());\n const { token, new_password } = parsed.data;\n const data = resetTokens.get(token);\n if (!data) return error(res, ERR.TOKEN_INVALID, 'Invalid reset token');\n if (data.exp < Math.floor(Date.now() / 1000)) {\n resetTokens.delete(token);\n return error(res, ERR.TOKEN_EXPIRED, 'Reset token expired');\n }\n if (!isStrongPassword(new_password)) return error(res, ERR.VALIDATION, 'Password too weak: require upper, lower, digit and symbol');\n const user = users.get(data.email);\n if (!user) return error(res, ERR.NOT_FOUND, 'User not found', 404);\n user.passwordHash = bcrypt.hashSync(new_password, 10);\n resetTokens.delete(token);\n return res.json({ message: 'Password updated' });\n});\n\napp.get('/me', (req, res) => {\n const user = (req as any).user as User | undefined;\n if (!user) return error(res, ERR.TOKEN_INVALID, 'Invalid or missing access token', 401);\n return res.json({ id: user.id, email: user.email, is_verified: user.isVerified });\n});\n\n// 启动服务器\nif (require.main === module) {\n const port = process.env.PORT || 3000;\n app.listen(port, () => logger.info({ event: 'server_start', port }));\n}\n\nexport default app;\n\n// ---------------------- 示例(curl) ----------------------\n// 1) 注册:\n// curl -X POST http://localhost:3000/auth/register -H "Content-Type: application/json" -d '{"email":"a@b.com","password":"Aa1!aaaa"}'\n// 2) 验证:\n// curl "http://localhost:3000/auth/verify?token=
{"generated_code":"#!/usr/bin/env bash\n# File: run_clean.sh\n# 简要:命令行封装,调用 Python 脚本执行清洗与报告生成\nset -euo pipefail\n\nusage() {\n echo "Usage: $0 -i INPUT_CSV -o OUTPUT_CSV -r REPORT_JSON [-d DELIM] [-c CONCURRENCY] [-e ERROR_CSV]"\n echo " -i 输入CSV路径"\n echo " -o 清洗后CSV输出路径"\n echo " -r 质量报告JSON输出路径"\n echo " -d 分隔符(默认 ,)"\n echo " -c 并发度(默认 1)"\n echo " -e 错误行收集CSV(默认 <输出同目录>/errors.csv)"\n}\n\nINPUT=""\nOUTPUT=""\nREPORT=""\nDELIM="," # 默认逗号\nCONCURRENCY=1\nERRORS=""\n\nwhile getopts ":i:o:r:d:c:e:h" opt; do\n case $opt in\n i) INPUT="$OPTARG" ;;\n o) OUTPUT="$OPTARG" ;;\n r) REPORT="$OPTARG" ;;\n d) DELIM="$OPTARG" ;;\n c) CONCURRENCY="$OPTARG" ;;\n e) ERRORS="$OPTARG" ;;\n h) usage; exit 0 ;;\n \?) echo "Invalid option -$OPTARG"; usage; exit 1 ;;\n :) echo "Option -$OPTARG requires an argument."; usage; exit 1 ;;\n esac\ndone\n\nif [[ -z "${INPUT}" || -z "${OUTPUT}" || -z "${REPORT}" ]]; then\n usage\n exit 1\nfi\n\nif [[ -z "${ERRORS}" ]]; then\n outdir=$(dirname "${OUTPUT}")\n ERRORS="${outdir}/errors.csv"\nfi\n\nSCRIPT_DIR=$(cd "$(dirname "$0")" && pwd)\nexec python3 "${SCRIPT_DIR}/clean_csv.py" \\n --input "${INPUT}" \\n --output "${OUTPUT}" \\n --report "${REPORT}" \\n --delimiter "${DELIM}" \\n --concurrency "${CONCURRENCY}" \\n --error-rows "${ERRORS}"\n\n\n# ---------------------------------------------\n# File: clean_csv.py\n# 简要:读取大CSV,按email去重,缺失值填充,日期/金额规范化,输出清洗CSV与质量报告\n#!/usr/bin/env python3\nimport argparse\nimport csv\nimport json\nimport os\nimport re\nimport sys\nfrom datetime import datetime\nfrom decimal import Decimal, ROUND_HALF_UP, InvalidOperation\nfrom collections import Counter, defaultdict\nfrom concurrent.futures import ThreadPoolExecutor\n\ntry:\n from tqdm import tqdm\nexcept Exception:\n # 简易回退:无tqdm则使用空进度条\n def tqdm(iterable=None, total=None, **kwargs):\n return iterable if iterable is not None else range(total or 0)\n\n# 简单邮箱正则\nEMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")\n\n# 常见日期格式集合\nDATE_PATTERNS = [\n "%Y-%m-%d",\n "%Y/%m/%d",\n "%d/%m/%Y",\n "%m/%d/%Y",\n "%Y-%m-%d %H:%M:%S",\n "%Y/%m/%d %H:%M:%S",\n "%d-%m-%Y",\n "%d.%m.%Y",\n]\n\nAMOUNT_KEYS = ("amount", "price", "total", "cost", "fee", "amt")\nDATE_KEYS = ("date", "time")\n\n# 将值视为缺失的判定\nMISSING_TOKENS = {"", " ", "na", "n/a", "null", "none", None}\n\ndef is_missing(v: str) -> bool:\n if v is None:\n return True\n s = str(v).strip().lower()\n return s in MISSING_TOKENS\n\n# 日期标准化为 YYYY-MM-DD\ndef normalize_date(s: str) -> str:\n s = s.strip()\n for p in DATE_PATTERNS:\n try:\n dt = datetime.strptime(s, p)\n return dt.strftime("%Y-%m-%d")\n except Exception:\n pass\n # 尝试移除时间部分再匹配\n if " " in s:\n base = s.split(" ", 1)[0]\n for p in ("%Y-%m-%d", "%Y/%m/%d", "%m/%d/%Y", "%d/%m/%Y"):\n try:\n dt = datetime.strptime(base, p)\n return dt.strftime("%Y-%m-%d")\n except Exception:\n pass\n raise ValueError(f"Unrecognized date: {s}")\n\n# 金额两位小数四舍五入\ndef normalize_amount(s: str) -> str:\n s = s.strip().replace(",", "") # 去千分位逗号\n try:\n v = Decimal(s)\n except (InvalidOperation, ValueError):\n raise ValueError(f"Invalid amount: {s}")\n return str(v.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP))\n\n# 猜测列类型\ndef guess_columns(header, email_hint=None):\n lower = [h.lower() for h in header]\n # email列\n email_col = None\n if email_hint:\n for i, h in enumerate(lower):\n if h == email_hint.lower():\n email_col = header[i]\n break\n if email_col is None:\n for i, h in enumerate(lower):\n if h == "email" or "email" in h:\n email_col = header[i]\n break\n # 日期列\n date_cols = [header[i] for i, h in enumerate(lower) if any(k in h for k in DATE_KEYS)]\n # 金额列\n amount_cols = [header[i] for i, h in enumerate(lower) if any(k in h for k in AMOUNT_KEYS)]\n return email_col, date_cols, amount_cols\n\n# 清洗一批数据行\ndef clean_batch(rows, email_col, date_cols, amount_cols):\n results = []\n for idx, raw in rows:\n cleaned = dict(raw) # 拷贝\n # 统计缺失(基于原始值)\n missing_flags = {col: is_missing(raw.get(col)) for col in raw.keys()}\n # 邮箱处理\n email_val = (raw.get(email_col, "") or "").strip()\n if is_missing(email_val):\n results.append((idx, None, "missing_email", None, missing_flags, raw))\n continue\n email_key = email_val.lower()\n if not EMAIL_RE.match(email_key):\n results.append((idx, None, "invalid_email", None, missing_flags, raw))\n continue\n # 列清洗:金额、日期、其他缺失填充\n # 金额\n try:\n for c in amount_cols:\n val = raw.get(c, "")\n if is_missing(val):\n cleaned[c] = "0.00"\n else:\n cleaned[c] = normalize_amount(str(val))\n except Exception as e:\n results.append((idx, None, f"amount_error:{e}", None, missing_flags, raw))\n continue\n # 日期\n try:\n for c in date_cols:\n val = raw.get(c, "")\n if is_missing(val):\n cleaned[c] = "" # 日期缺失保留为空\n else:\n cleaned[c] = normalize_date(str(val))\n except Exception as e:\n results.append((idx, None, f"date_error:{e}", None, missing_flags, raw))\n continue\n # 其他列缺失用N/A\n for c in raw.keys():\n if c in amount_cols or c in date_cols:\n continue\n val = raw.get(c, "")\n if is_missing(val):\n cleaned[c] = "N/A"\n else:\n cleaned[c] = str(val).strip()\n results.append((idx, cleaned, None, email_key, missing_flags, raw))\n return results\n\ndef count_total_rows(path):\n total = 0\n with open(path, 'r', newline='', encoding='utf-8') as f:\n for _ in f:\n total += 1\n return max(0, total - 1) # 去除表头\n\ndef main():\n parser = argparse.ArgumentParser(description="CSV 清洗脚本:去重/补全/规范化/报告")\n parser.add_argument('--input', required=True, help='输入CSV')\n parser.add_argument('--output', required=True, help='清洗后CSV输出')\n parser.add_argument('--report', required=True, help='质量报告JSON输出')\n parser.add_argument('--delimiter', default=',', help='分隔符,默认 ,')\n parser.add_argument('--concurrency', type=int, default=1, help='并发度(线程数)')\n parser.add_argument('--error-rows', required=True, help='错误行收集CSV')\n parser.add_argument('--email-column', default=None, help='邮箱列名(默认自动识别)')\n parser.add_argument('--chunk-size', type=int, default=10000, help='分批大小,默认10000')\n\n args = parser.parse_args()\n\n if args.concurrency < 1:\n args.concurrency = 1\n\n total_rows = count_total_rows(args.input)\n\n with open(args.input, 'r', newline='', encoding='utf-8') as fin:\n reader = csv.DictReader(fin, delimiter=args.delimiter, skipinitialspace=True)\n header = reader.fieldnames or []\n if not header:\n print("空文件或无表头", file=sys.stderr)\n sys.exit(1)\n\n email_col, date_cols, amount_cols = guess_columns(header, args.email_column)\n if not email_col:\n print("无法识别邮箱列,请使用 --email-column 指定", file=sys.stderr)\n sys.exit(1)\n\n # 输出文件与错误文件\n os.makedirs(os.path.dirname(os.path.abspath(args.output)), exist_ok=True)\n os.makedirs(os.path.dirname(os.path.abspath(args.error_rows)), exist_ok=True)\n cleaned_out = open(args.output, 'w', newline='', encoding='utf-8')\n error_out = open(args.error_rows, 'w', newline='', encoding='utf-8')\n try:\n writer = csv.DictWriter(cleaned_out, fieldnames=header, delimiter=args.delimiter)\n writer.writeheader()\n error_writer = csv.DictWriter(error_out, fieldnames=header + ["error_reason"], delimiter=args.delimiter)\n error_writer.writeheader()\n\n # 统计\n seen_emails = set()\n duplicate_count = 0\n error_count = 0\n written_count = 0\n missing_counts = defaultdict(int) # 每列缺失计数(基于原始)\n dist_counters = {c: Counter() for c in header} # 字段分布(基于清洗且去重后)\n\n batch = []\n futures = []\n\n def submit_batch(executor, b):\n if not b:\n return None\n return executor.submit(clean_batch, b, email_col, date_cols, amount_cols)\n\n with ThreadPoolExecutor(max_workers=args.concurrency) as pool:\n pbar = tqdm(total=total_rows, desc="Cleaning", unit="rows")\n # 读取行并分批提交\n for idx, row in enumerate(reader, start=1):\n batch.append((idx, row))\n if len(batch) >= args.chunk_size:\n fut = submit_batch(pool, batch)\n if fut is not None:\n futures.append(fut)\n batch = []\n if batch:\n fut = submit_batch(pool, batch)\n if fut is not None:\n futures.append(fut)\n\n # 顺序处理各批结果\n for fut in futures:\n results = fut.result()\n for (idx, cleaned, err, email_key, missing_flags, raw) in results:\n # 累积缺失(原始层面)\n for c, miss in missing_flags.items():\n if miss:\n missing_counts[c] += 1\n if err is not None:\n error_count += 1\n raw_with_err = dict(raw)\n raw_with_err["error_reason"] = err\n error_writer.writerow(raw_with_err)\n else:\n # 去重:按标准化后的email\n if email_key in seen_emails:\n duplicate_count += 1\n else:\n seen_emails.add(email_key)\n writer.writerow(cleaned)\n written_count += 1\n # 分布统计(清洗且去重后)\n for c in header:\n dist_counters[c][cleaned.get(c, "")] += 1\n pbar.update(1)\n if hasattr(pbar, 'close'):\n pbar.close()\n finally:\n cleaned_out.close()\n error_out.close()\n\n # 质量报告\n missing_rate = {c: (missing_counts[c] / total_rows if total_rows > 0 else 0.0) for c in header}\n field_distribution = {\n c: [\n {"value": val, "count": cnt}\n for (val, cnt) in dist_counters[c].most_common(10)\n ]\n for c in header\n }\n\n report = {\n "input_rows": total_rows,\n "output_rows": int(sum(v["count"] for v in [{"count": cnt} for cnt in dist_counters.get(header[0], Counter()).values()])) if header else 0,\n "written_rows": written_count,\n "duplicate_count": duplicate_count,\n "error_rows": error_count,\n "missing_rate": missing_rate,\n "field_distribution": field_distribution,\n "email_column": email_col,\n "date_columns": date_cols,\n "amount_columns": amount_cols,\n "delimiter": args.delimiter,\n "concurrency": args.concurrency,\n "error_rows_output": os.path.abspath(args.error_rows),\n "cleaned_output": os.path.abspath(args.output)\n }\n\n # 修正output_rows:使用written_count\n report["output_rows"] = written_count\n\n with open(args.report, 'w', encoding='utf-8') as rf:\n json.dump(report, rf, ensure_ascii=False, indent=2)\n\n # 简要统计输出\n print(f"Done. Input: {total_rows}, Output: {written_count}, Duplicates: {duplicate_count}, Errors: {error_count}")\n\nif name == 'main':\n main()\n"}
{
"generated_code": "// =========================\n// SQL: schema.sql (PostgreSQL)\n// =========================\n-- Inventory items with idempotent UPSERT via unique SKU.\nCREATE TABLE IF NOT EXISTS inventory_item (\n sku VARCHAR(64) PRIMARY KEY,\n quantity INTEGER NOT NULL CHECK (quantity >= 0),\n external_updated_at TIMESTAMPTZ,\n checksum TEXT,\n updated_at TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\n-- Maintain global sync state for inventory resource ETag and heartbeat.\nCREATE TABLE IF NOT EXISTS sync_state (\n resource VARCHAR(64) PRIMARY KEY,\n etag TEXT,\n last_synced_at TIMESTAMPTZ NOT NULL DEFAULT now()\n);\n\n-- Useful indexes for queries/maintenance.\nCREATE INDEX IF NOT EXISTS idx_inventory_item_updated_at ON inventory_item (updated_at DESC);\nCREATE INDEX IF NOT EXISTS idx_inventory_item_checksum ON inventory_item (checksum);\n\n-- Seed sync_state for 'inventory' resource if missing.\nINSERT INTO sync_state (resource, etag)\nVALUES ('inventory', NULL)\nON CONFLICT (resource) DO NOTHING;\n\n-- Example UPSERT used by repository layer\n-- INSERT INTO inventory_item (sku, quantity, external_updated_at, checksum, updated_at)\n-- VALUES ($1, $2, $3, $4, now())\n-- ON CONFLICT (sku) DO UPDATE SET\n-- quantity = EXCLUDED.quantity,\n-- external_updated_at = EXCLUDED.external_updated_at,\n-- checksum = EXCLUDED.checksum,\n-- updated_at = now();\n\n\n// =====================================================\n// Java: src/main/java/com/example/inventorysync/config/AppConfig.java\n// =====================================================\npackage com.example.inventorysync.config;\n\nimport io.netty.channel.ChannelOption;\nimport io.netty.handler.timeout.ReadTimeoutHandler;\nimport io.netty.handler.timeout.WriteTimeoutHandler;\nimport org.slf4j.MDC;\nimport org.springframework.beans.factory.annotation.Value;\nimport org.springframework.context.annotation.Bean;\nimport org.springframework.context.annotation.Configuration;\nimport org.springframework.http.client.reactive.ReactorClientHttpConnector;\nimport org.springframework.scheduling.annotation.EnableScheduling;\nimport org.springframework.web.reactive.function.client.ExchangeStrategies;\nimport org.springframework.web.reactive.function.client.WebClient;\nimport reactor.netty.http.client.HttpClient;\n\nimport java.time.Duration;\n\n@Configuration\n@EnableScheduling\npublic class AppConfig {\n\n @Bean\n public WebClient webClient(\n @Value("${inventory.api.base-url}") String baseUrl,\n @Value("${inventory.http.max-in-memory-size:10MB}") String maxInMem\n ) {\n int maxBytes = (int) (parseSizeToBytes(maxInMem));\n\n HttpClient httpClient = HttpClient.create()\n .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)\n .responseTimeout(Duration.ofSeconds(15))\n .doOnConnected(conn -> conn\n .addHandlerLast(new ReadTimeoutHandler(15))\n .addHandlerLast(new WriteTimeoutHandler(15))\n );\n\n ExchangeStrategies strategies = ExchangeStrategies.builder()\n .codecs(c -> c.defaultCodecs().maxInMemorySize(maxBytes))\n .build();\n\n return WebClient.builder()\n .baseUrl(baseUrl)\n .exchangeStrategies(strategies)\n .clientConnector(new ReactorClientHttpConnector(httpClient))\n .defaultHeader("User-Agent", "inventory-sync-service/1.0")\n .filter((request, next) -> {\n // Propagate traceId from MDC to outgoing requests\n String traceId = MDC.get("traceId");\n if (traceId != null) {\n return next.exchange(\n ClientRequest.from(request)\n .header("X-Trace-Id", traceId)\n .build()\n );\n }\n return next.exchange(request);\n })\n .build();\n }\n\n private long parseSizeToBytes(String size) {\n String s = size.trim().toUpperCase();\n if (s.endsWith("KB")) return Long.parseLong(s.replace("KB", "").trim()) * 1024L;\n if (s.endsWith("MB")) return Long.parseLong(s.replace("MB", "").trim()) * 1024L * 1024L;\n if (s.endsWith("B")) return Long.parseLong(s.replace("B", "").trim());\n return Long.parseLong(s);\n }\n}\n\n\n// ==================================================================\n// Java: src/main/java/com/example/inventorysync/dto/InventoryItem.java\n// ==================================================================\npackage com.example.inventorysync.dto;\n\nimport java.time.Instant;\nimport java.util.Objects;\n\n// Inventory DTO representing an item from external API and local table.\npublic class InventoryItem {\n private final String sku;\n private final int quantity;\n private final Instant externalUpdatedAt;\n private final String checksum; // Optional: hash of source payload for change detection\n\n public InventoryItem(String sku, int quantity, Instant externalUpdatedAt, String checksum) {\n this.sku = sku;\n this.quantity = quantity;\n this.externalUpdatedAt = externalUpdatedAt;\n this.checksum = checksum;\n }\n\n public String getSku() { return sku; }\n public int getQuantity() { return quantity; }\n public Instant getExternalUpdatedAt() { return externalUpdatedAt; }\n public String getChecksum() { return checksum; }\n\n @Override\n public boolean equals(Object o) {\n if (this == o) return true;\n if (!(o instanceof InventoryItem)) return false;\n InventoryItem that = (InventoryItem) o;\n return quantity == that.quantity &&\n Objects.equals(sku, that.sku) &&\n Objects.equals(externalUpdatedAt, that.externalUpdatedAt) &&\n Objects.equals(checksum, that.checksum);\n }\n\n @Override\n public int hashCode() { return Objects.hash(sku, quantity, externalUpdatedAt, checksum); }\n\n @Override\n public String toString() {\n return "InventoryItem{" +\n "sku='" + sku + '\'' +\n ", quantity=" + quantity +\n ", externalUpdatedAt=" + externalUpdatedAt +\n ", checksum='" + checksum + '\'' +\n '}';\n }\n}\n\n\n// ==================================================================================\n// Java: src/main/java/com/example/inventorysync/repository/InventoryRepository.java\n// ==================================================================================\npackage com.example.inventorysync.repository;\n\nimport com.example.inventorysync.dto.InventoryItem;\nimport org.springframework.transaction.annotation.Transactional;\n\nimport java.util.List;\n\npublic interface InventoryRepository {\n @Transactional\n void upsertBatch(List
通过生成高效且高质量的代码片段,为开发者提供一站式代码解决方案。无论是快速打样、构建功能模块,还是满足不同编程语言和个人化需求,该提示词能够帮助开发者显著提升开发效率,让技术人员专注于核心业务创新。
无需扎实的代码功底,通过简洁描述便可生成代码片段,快速入门编程,轻松理解代码逻辑。
在项目紧急或者开发需求繁杂的情况下,即刻生成标准化代码片段,提效减负,一键完成日常需求。
为团队提供便捷开发工具,优化代码模块标准化输出,统一项目代码风格,提高沟通与协作效率。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期