热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
针对指定代码进行全面安全审查,识别逻辑错误、潜在缺陷及漏洞,并提供可执行的优化方案与防护建议,帮助开发团队提升代码质量、可维护性和安全性,适用于安全评估与代码审查场景。
下面是针对这段 Flask/Python 代码的全面审查与改进建议,结合 OWASP Top 10 的问题分类,并给出可直接落地的修复版本示例,满足“基础登录与个人资料读取接口、支持记住登录体验与简单角色展示”的目标。
主要问题与风险(映射 OWASP Top 10)
改进目标与原则
参考修复实现示例(依赖:Flask、PyJWT、werkzeug.security) 说明:
代码: import os import sqlite3 import datetime from flask import Flask, request, jsonify, make_response import jwt from werkzeug.security import generate_password_hash, check_password_hash
app = Flask(name) app.config['SECRET_KEY'] = os.getenv('SECRET_KEY', 'change-this-in-env') app.config['COOKIE_SECURE'] = os.getenv('COOKIE_SECURE', '0') == '1' # 在 HTTPS 环境设为 1 DB_PATH = 'app.db'
def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn
def get_user_by_username(username): with get_db() as conn: return conn.execute( "SELECT id, username, role, password_hash FROM users WHERE username = ?", (username,) ).fetchone()
def issue_token(user_id, username, role, remember=False): exp = datetime.datetime.utcnow() + (datetime.timedelta(days=30) if remember else datetime.timedelta(hours=8)) payload = {'sub': user_id, 'username': username, 'role': role, 'exp': exp} return jwt.encode(payload, app.config['SECRET_KEY'], algorithm='HS256')
_attempts = {} ATTEMPT_WINDOW = 300 ATTEMPT_LIMIT = 5
def record_and_check_attempt(key): now = datetime.datetime.utcnow().timestamp() arr = [t for t in _attempts.get(key, []) if now - t < ATTEMPT_WINDOW] if len(arr) >= ATTEMPT_LIMIT: _attempts[key] = arr return True arr.append(now) _attempts[key] = arr return False
@app.route('/login', methods=['POST']) def login(): if not request.is_json: return jsonify({'ok': False, 'msg': 'invalid content type'}), 400 data = request.get_json() or {} username = (data.get('username') or '').strip() password = data.get('password') or '' remember = bool(data.get('remember', False))
if not username or not password:
return jsonify({'ok': False, 'msg': 'invalid credentials'}), 401
key = f"{request.remote_addr}:{username}"
if record_and_check_attempt(key):
return jsonify({'ok': False, 'msg': 'too many attempts'}), 429
user = get_user_by_username(username)
if not user or not check_password_hash(user['password_hash'], password):
return jsonify({'ok': False, 'msg': 'invalid credentials'}), 401
token = issue_token(user['id'], user['username'], user['role'], remember)
resp = make_response(jsonify({'ok': True, 'role': user['role']}))
resp.set_cookie(
'session',
token,
httponly=True,
secure=app.config['COOKIE_SECURE'],
samesite='Lax',
max_age=30*24*3600 if remember else None
)
return resp
@app.route('/profile', methods=['GET']) def profile(): token = request.cookies.get('session', '') if not token: return jsonify({'ok': False, 'msg': 'no session'}), 401 try: payload = jwt.decode(token, app.config['SECRET_KEY'], algorithms=['HS256']) except jwt.ExpiredSignatureError: return jsonify({'ok': False, 'msg': 'session expired'}), 401 except jwt.InvalidTokenError: return jsonify({'ok': False, 'msg': 'invalid session'}), 401 return jsonify({'ok': True, 'user': payload.get('username'), 'role': payload.get('role')})
@app.route('/logout', methods=['POST']) def logout(): resp = make_response(jsonify({'ok': True})) resp.set_cookie('session', '', max_age=0) return resp
if name == 'main': # 仅限本地开发使用 debug;演示或生产禁用 app.run(debug=False)
数据库示例与密码哈希迁移
进一步安全与设计建议
通过上述修复与建议,模块将基本满足原型联调与演示的需求,并显著降低注入、认证与会话相关的安全风险,符合 OWASP Top 10 的核心防护要点。
总体问题与风险(结合CWE/SANS)
满足目标的改进原则
参考修正版示例(保留流式上传,同时安全化) 说明:使用 busboy 进行 multipart/form-data 流式解析;白名单 MIME;随机文件名;大小限制;安全静态挂载;基础防护中间件与速率限制。若前端确实只发 application/octet-stream 原始流,也可以增加一个 /upload/raw 路由,逻辑相同并做长度限制。
const express = require('express'); const path = require('path'); const fs = require('fs'); const fsp = require('fs/promises'); const crypto = require('crypto'); const helmet = require('helmet'); const Busboy = require('busboy'); const { pipeline } = require('stream/promises'); const rateLimit = require('express-rate-limit');
const app = express();
app.disable('x-powered-by'); app.use(helmet({ contentSecurityPolicy: false })); // 可根据前端实际 CSP 配置开启 const UPLOADS_DIR = path.resolve(__dirname, 'public', 'uploads'); const STATIC_ROOT = path.resolve(__dirname, 'public');
// 白名单 MIME -> 扩展名 const ALLOWED = { 'image/jpeg': '.jpg', 'image/png': '.png', 'image/webp': '.webp', 'application/pdf': '.pdf' }; const MAX_SIZE = 10 * 1024 * 1024; // 10MB,按需调整
// 基础静态资源(非上传目录) app.use(express.static(STATIC_ROOT, { dotfiles: 'ignore', etag: true, fallthrough: true, setHeaders: (res) => { res.setHeader('X-Content-Type-Options', 'nosniff'); } }));
// 单独挂载上传目录,设置更严格头部 app.use('/uploads', express.static(UPLOADS_DIR, { dotfiles: 'ignore', etag: true, fallthrough: true, setHeaders: (res, filePath) => { res.setHeader('X-Content-Type-Options', 'nosniff'); const ext = path.extname(filePath).toLowerCase(); // 对非图片类附件强制下载,减少浏览器内执行风险 if (ext === '.pdf') { res.setHeader('Content-Disposition', 'attachment; filename="' + path.basename(filePath) + '"'); } } }));
// 简易鉴权中间件占位(替换为实际 JWT/Session 校验) function requireAuth(req, res, next) { // 示例:检查 Authorization Bearer 或 Cookie // if (!tokenIsValid(req)) return res.status(401).json({ ok: false, error: 'unauthorized' }); next(); }
// 速率限制,避免暴力 DoS const uploadLimiter = rateLimit({ windowMs: 60 * 1000, max: 30, standardHeaders: true, legacyHeaders: false });
// 确保上传目录存在 async function ensureUploadDir() { await fsp.mkdir(UPLOADS_DIR, { recursive: true, mode: 0o750 }); }
// 路由:安全化流式上传(multipart/form-data) app.post('/upload', requireAuth, uploadLimiter, async (req, res, next) => { try { const ctype = req.headers['content-type'] || ''; if (!ctype.startsWith('multipart/form-data')) { return res.status(415).json({ ok: false, error: 'multipart/form-data required' }); }
await ensureUploadDir();
const bb = Busboy({ headers: req.headers, limits: { files: 1, fileSize: MAX_SIZE } });
let responded = false;
bb.on('file', (name, file, info) => {
const mime = info.mimeType;
const ext = ALLOWED[mime];
if (!ext) {
file.resume();
bb.emit('error', new Error('unsupported file type'));
return;
}
const id = crypto.randomUUID();
const filename = id + ext;
const fullPath = path.join(UPLOADS_DIR, filename);
const resolved = path.resolve(fullPath);
const base = path.resolve(UPLOADS_DIR) + path.sep;
if (!resolved.startsWith(base)) {
file.resume();
bb.emit('error', new Error('bad path'));
return;
}
const out = fs.createWriteStream(resolved, { flags: 'wx', mode: 0o640 });
pipeline(file, out)
.then(() => {
if (!responded) {
responded = true;
// 审计示例:可记录 req.user、IP、文件ID、mime、大小(Busboy info 有 size 近似)
// auditLog({ user: req.user?.id, ip: req.ip, id, mime });
res.json({ ok: true, path: '/uploads/' + filename });
}
})
.catch(async (err) => {
try { await fsp.unlink(resolved); } catch {}
if (!responded) {
responded = true;
res.status(400).json({ ok: false, error: err.message });
}
});
});
bb.on('error', (err) => {
if (!responded) {
responded = true;
res.status(400).json({ ok: false, error: err.message });
}
});
bb.on('finish', () => {
if (!responded) {
responded = true;
res.status(400).json({ ok: false, error: 'no file received' });
}
});
req.on('aborted', () => {
bb.removeAllListeners();
});
req.pipe(bb);
} catch (err) { next(err); } });
// 其他页面 app.get('/files', (req, res) => { res.sendFile(path.join(STATIC_ROOT, 'index.html')); });
// 统一错误处理中间件 app.use((err, req, res, next) => { // 生产环境可接入结构化日志 console.error(err); res.status(500).json({ ok: false, error: 'internal server error' }); });
app.listen(3000);
额外防护与运维建议
通过以上改造,可以在保持“前端流式上传、后端落盘并返回可访问 URL”的原型目标的同时,显著降低路径遍历、无限制上传、DoS、XSS、未鉴权等高危风险,符合生产环境的基本安全基线(CWE/SANS)。
以下是对当前 Go 代码的全面审查与改进建议,目标是让微服务在集成环境中以安全、快速的方式验证基本鉴权流程与错误处理,并符合常见安全规范(参考 CERT、OWASP API Security)。
一、主要问题与风险
二、威胁场景简述
三、分阶段改进与修复建议(优先级从高到低)
四、参考实现(简化版,适用于集成环境演示) 说明:
代码示例(可直接替换当前 main): package main
import ( "context" "encoding/json" "errors" "log" "net" "net/http" "os" "strings" "time"
"github.com/golang-jwt/jwt/v5"
"golang.org/x/time/rate"
)
type ctxKey string
var userKey ctxKey = "user"
type Claims struct {
Scope string json:"scope"
jwt.RegisteredClaims
}
type apiError struct {
Code string json:"code"
Message string json:"message"
}
func writeJSON(w http.ResponseWriter, status int, body any) { w.Header().Set("Content-Type", "application/json") w.Header().Set("Cache-Control", "no-store") w.WriteHeader(status) _ = json.NewEncoder(w).Encode(body) }
func validateJWT(tok string, secret []byte) (string, error) { parser := jwt.NewParser( jwt.WithValidMethods([]string{"HS256"}), jwt.WithIssuedAt(), jwt.WithLeeway(60*time.Second), ) claims := &Claims{} _, err := parser.ParseWithClaims(tok, claims, func(t *jwt.Token) (any, error) { return secret, nil }) if err != nil { return "", err } // 颁发者/受众校验(根据你的环境调整) if claims.Issuer != "demo-issuer" { return "", errors.New("invalid iss") } if !claims.VerifyAudience("demo-aud", true) { return "", errors.New("invalid aud") } // scope 示例:要求 secure:read if claims.Scope != "secure:read" { return "", errors.New("insufficient scope") } // exp/nbf/iat 已由 parser 校验 return claims.Subject, nil }
type ipLimiter struct { limiters map[string]*rate.Limiter r rate.Limit b int }
func newIpLimiter(r rate.Limit, b int) *ipLimiter { return &ipLimiter{limiters: make(map[string]*rate.Limiter), r: r, b: b} }
func (l *ipLimiter) get(ip string) *rate.Limiter { if lim, ok := l.limiters[ip]; ok { return lim } lim := rate.NewLimiter(l.r, l.b) l.limiters[ip] = lim return lim }
func authMiddleware(next http.Handler, secret []byte, ipLim *ipLimiter) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { ip, _, _ := net.SplitHostPort(r.RemoteAddr) if ip == "" { ip = "unknown" } if !ipLim.get(ip).Allow() { writeJSON(w, http.StatusTooManyRequests, apiError{Code: "rate_limited", Message: "too many requests"}) return }
auth := strings.TrimSpace(r.Header.Get("Authorization"))
if !strings.HasPrefix(auth, "Bearer ") {
w.Header().Set("WWW-Authenticate", `Bearer realm="secure"`)
writeJSON(w, http.StatusUnauthorized, apiError{Code: "unauthorized", Message: "invalid or missing token"})
return
}
tok := strings.TrimSpace(strings.TrimPrefix(auth, "Bearer "))
if tok == "" {
w.Header().Set("WWW-Authenticate", `Bearer realm="secure"`)
writeJSON(w, http.StatusUnauthorized, apiError{Code: "unauthorized", Message: "invalid or missing token"})
return
}
user, err := validateJWT(tok, secret)
if err != nil {
// 审计日志:不记录 token 原文
log.Printf("auth_failed ip=%s err=%v", ip, err)
w.Header().Set("WWW-Authenticate", `Bearer error="invalid_token"`)
writeJSON(w, http.StatusUnauthorized, apiError{Code: "unauthorized", Message: "invalid or missing token"})
return
}
ctx := context.WithValue(r.Context(), userKey, user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func secureHandler(w http.ResponseWriter, r *http.Request) { user, _ := r.Context().Value(userKey).(string) writeJSON(w, http.StatusOK, map[string]string{ "message": "hello", "user": user, }) }
func main() { secret := []byte(os.Getenv("JWT_SECRET")) if len(secret) == 0 { log.Fatal("JWT_SECRET not set") }
mux := http.NewServeMux()
ipLim := newIpLimiter(5, 10) // 每秒约 5 次,突发 10 次,按需调整
mux.Handle("/secure", authMiddleware(http.HandlerFunc(secureHandler), secret, ipLim))
srv := &http.Server{
Addr: ":8080",
Handler: mux,
ReadHeaderTimeout: 2 * time.Second,
ReadTimeout: 5 * time.Second,
WriteTimeout: 10 * time.Second,
IdleTimeout: 60 * time.Second,
// MaxHeaderBytes 可在需要时设置,例如 1<<20
}
log.Println("listening on :8080")
// 开发环境可用明文;集成/生产环境建议置于启用 TLS 的反向代理后
log.Fatal(srv.ListenAndServe())
}
五、与已知漏洞清单的对照修复
六、进一步建议(按需实施)
以上改造能在不牺牲“快速接入与演示”的前提下,使 /secure 的鉴权更接近标准实践,降低伪造、重放与资源耗尽风险,并提升错误处理与审计可用性。
帮助开发人员快速识别代码中的潜在缺陷、逻辑错误和安全漏洞,并基于专业建议优化代码质量,为开发过程中的代码安全与稳定性提供保障。
帮助开发者快速找到复杂代码中的缺陷与安全漏洞,并提供优化解决方案,节省排查时间,提高开发质量。
为技术负责人提供全面的代码安全审查能力,确保团队输出的代码满足安全与质量要求,降低风险。
自动识别代码在运行环境下可能存在的漏洞或性能问题,并给出优化建议,助力稳定和高效发布。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期