热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专门用于为Python代码添加专业的异常处理机制,能够识别代码中的潜在风险点并构建完善的错误处理框架。通过系统化的异常捕获、分类处理和错误信息优化,显著提升代码的健壮性和可维护性。支持多种异常类型处理,包括文件操作、网络请求、数据转换等常见场景,同时提供清晰的错误日志和用户友好的提示信息,帮助开发者快速定位和解决问题。
原始代码分析 识别到的潜在异常风险点与分类:
整体策略:
增强后代码
import os
import csv
import json
import sys
import logging
from datetime import datetime
"""
批量扫描目录下的CSV账目文件,合并并输出汇总JSON。
风险点:目录不存在、文件编码/格式异常、字段缺失、数值转换失败、写盘失败。
当前未做任何异常处理和日志记录。
"""
# 基础日志配置:INFO级别,时间+级别+消息;错误时包含堆栈
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
def scan_csvs(input_dir):
result = []
try:
names = os.listdir(input_dir)
except FileNotFoundError:
logging.error("输入目录不存在:%s", input_dir, exc_info=True)
raise
except NotADirectoryError:
logging.error("输入路径不是目录:%s", input_dir, exc_info=True)
raise
except PermissionError:
logging.error("没有访问目录权限:%s", input_dir, exc_info=True)
raise
except OSError as e:
logging.error("读取目录失败:%s(%s)", input_dir, e, exc_info=True)
raise
for name in names:
if name.lower().endswith('.csv'):
path = os.path.join(input_dir, name)
try:
# newline='' 以避免CSV换行兼容问题
with open(path, 'r', encoding='utf-8', newline='') as f:
try:
reader = csv.DictReader(f)
for row in reader:
row['source'] = name
if 'amount' in row:
try:
row['amount'] = float((row['amount'] or '').strip() or '0')
except (ValueError, TypeError):
# 非数字或格式异常时,按0处理并告警,但不中断处理
line_no = getattr(reader, "line_num", -1)
logging.warning(
"文件 %s 第 %s 行 amount 解析失败,已置为 0。原值=%r",
name, line_no, row.get('amount'), exc_info=False
)
row['amount'] = 0.0
result.append(row)
except csv.Error as e:
logging.error("解析CSV失败:文件=%s,错误=%s", path, e, exc_info=True)
# 跳过该文件,继续处理其他文件
continue
except FileNotFoundError:
logging.warning("扫描时文件不存在(可能被并发移动/删除):%s", path, exc_info=True)
continue
except PermissionError:
logging.error("没有读取文件权限:%s", path, exc_info=True)
continue
except UnicodeDecodeError:
logging.error("文件编码错误(非UTF-8或损坏):%s", path, exc_info=True)
continue
except OSError as e:
logging.error("读取文件失败:%s(%s)", path, e, exc_info=True)
continue
return result
def summarize(rows):
total = 0.0
by_cat = {}
for r in rows:
amt = r.get('amount', 0.0) or 0.0
try:
total += amt
except TypeError:
# 双重保护:若上游仍意外留下了非数值,置零并告警
logging.warning("记录 amount 类型异常,已按0处理:%r", amt, exc_info=False)
amt = 0.0
total += 0.0
cat = r.get('category') or 'unknown'
by_cat[cat] = by_cat.get(cat, 0.0) + (amt if isinstance(amt, (int, float)) else 0.0)
return {
'total': total,
'by_category': by_cat,
'count': len(rows)
}
def main():
input_dir = sys.argv[1] if len(sys.argv) > 1 else './data'
out_file = sys.argv[2] if len(sys.argv) > 2 else './output/summary.json'
try:
rows = scan_csvs(input_dir)
except (FileNotFoundError, NotADirectoryError, PermissionError, OSError):
# 致命错误:输入目录不可用
logging.error("终止:无法扫描输入目录 %s", input_dir)
sys.exit(1)
summary = summarize(rows)
payload = {
'generated_at': datetime.utcnow().isoformat(),
'summary': summary
}
# 处理输出目录为空字符串的情况(表示当前目录)
out_dir = os.path.dirname(out_file) or '.'
try:
os.makedirs(out_dir, exist_ok=True)
except PermissionError:
logging.error("无法创建输出目录(权限不足):%s", out_dir, exc_info=True)
sys.exit(2)
except OSError as e:
logging.error("创建输出目录失败:%s(%s)", out_dir, e, exc_info=True)
sys.exit(2)
try:
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
except PermissionError:
logging.error("无法写入输出文件(权限不足):%s", out_file, exc_info=True)
sys.exit(3)
except FileNotFoundError:
logging.error("输出文件路径无效:%s", out_file, exc_info=True)
sys.exit(3)
except TypeError as e:
logging.error("JSON 序列化失败:%s", e, exc_info=True)
sys.exit(3)
except OSError as e:
logging.error("写入文件失败:%s(%s)", out_file, e, exc_info=True)
sys.exit(3)
print(f"Processed {len(rows)} rows, total={summary['total']}")
if __name__ == '__main__':
main()
异常处理说明
使用建议
Original Code Analysis Key exception risk points identified:
To ensure strict, precise exception handling without changing business logic, we add timeouts, targeted retries, explicit status checks, and thorough error handling with clear logs.
Enhanced Code
import os
import json
import time
import logging
from urllib.parse import urlparse
import requests
"""
从服务端分页拉取内容并按需下载附件。
风险点:无超时/重试、HTTP错误未检查、JSON解析失败、磁盘写入失败、环境变量缺失。
"""
# ---- Configuration for robust networking and logging ----
REQUEST_TIMEOUT = (3.05, 15) # (connect timeout, read timeout)
MAX_RETRIES = 3
BACKOFF_FACTOR = 0.6 # exponential backoff: 0.6, 1.2, 2.4...
# Basic logger setup (stdout/stderr)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
BASE_URL = os.getenv('SERVICE_URL', 'https://api.example.com')
TOKEN = os.getenv('API_TOKEN', '')
def _validate_base_url(url: str) -> None:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
logger.warning(
"BASE_URL %r appears invalid (missing scheme or host). "
"Please set SERVICE_URL like 'https://api.example.com'.",
url
)
def _sleep_backoff(attempt: int) -> None:
# attempt starts at 1
delay = BACKOFF_FACTOR * (2 ** (attempt - 1))
time.sleep(delay)
def _request_with_retry(url: str, *, headers=None, stream: bool = False, timeout=REQUEST_TIMEOUT) -> requests.Response:
"""
GET with strict error handling:
- Timeout and retry on transient errors (Timeout, ConnectionError)
- Retry on 5xx, 429, 408; do not retry on other 4xx
- Raise for non-2xx statuses
"""
headers = headers or {}
last_exc = None
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.get(url, headers=headers, stream=stream, timeout=timeout)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as http_err:
status = resp.status_code
# Retry server-side errors and known transient client-side codes
if status >= 500 or status in (408, 429):
logger.warning(
"HTTP %s on %s (attempt %d/%d); will retry. Detail: %s",
status, url, attempt, MAX_RETRIES, http_err
)
last_exc = http_err
else:
# Non-retryable client errors
logger.error("HTTP %s on %s; not retrying. Detail: %s", status, url, http_err)
raise
else:
return resp # success path
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as net_err:
logger.warning(
"Network error on %s (attempt %d/%d); will retry. Detail: %s",
url, attempt, MAX_RETRIES, net_err
)
last_exc = net_err
except requests.exceptions.RequestException as req_err:
# Other non-retryable request errors (InvalidURL, TooManyRedirects, etc.)
logger.error("Request error on %s; not retrying. Detail: %s", url, req_err)
raise
# Retry if not last attempt
if attempt < MAX_RETRIES:
_sleep_backoff(attempt)
# If we exhausted retries, raise the last exception
assert last_exc is not None
raise last_exc
def fetch_page(page=1):
url = f"{BASE_URL}/v1/items?page={page}"
headers = {'Authorization': f'Bearer {TOKEN}'} if TOKEN else {}
# Validate base URL once (non-fatal)
_validate_base_url(BASE_URL)
# Perform request with timeout, status check, and retries
resp = _request_with_retry(url, headers=headers, stream=False, timeout=REQUEST_TIMEOUT)
# JSON parsing with robust error reporting
try:
data = resp.json()
except json.JSONDecodeError as e:
# Try to include a small snippet to aid debugging (avoid huge logs)
snippet = ""
try:
snippet = resp.text[:200]
except Exception:
pass
logger.error(
"Failed to parse JSON for page=%s from %s. Error: %s. Body snippet: %r",
page, url, e, snippet
)
raise
return data
def download_asset(item, folder='./downloads'):
url = item.get('asset_url')
if not url:
logger.info("Item %r has no asset_url; skipping download.", item.get('id', '<unknown>'))
return None
# Ensure item has an 'id' for filename
item_id = item.get('id')
if item_id is None:
logger.warning("Missing 'id' in item; cannot derive filename. Skipping download.")
return None
# Prepare folder
try:
os.makedirs(folder, exist_ok=True)
except OSError as e:
logger.error("Failed to create folder %s: %s", folder, e)
return None
filename = os.path.join(folder, f"{item_id}.bin")
# Download with retry and streaming
try:
resp = _request_with_retry(url, headers={}, stream=True, timeout=REQUEST_TIMEOUT)
except requests.exceptions.RequestException as e:
logger.error("Failed to start download for item %s from %s: %s", item_id, url, e)
return None
# Stream to disk safely; clean up partial files on failure
try:
with open(filename, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk: # filters keep-alive chunks
f.write(chunk)
except requests.exceptions.RequestException as e:
logger.error("Network error while streaming item %s from %s: %s", item_id, url, e)
try:
if os.path.exists(filename):
os.remove(filename)
except OSError:
pass
return None
except OSError as e:
logger.error("File write error for %s: %s", filename, e)
try:
if os.path.exists(filename):
os.remove(filename)
except OSError:
pass
return None
finally:
try:
resp.close()
except Exception:
pass
return filename
def sync_all():
page = 1
total = 0
while True:
try:
data = fetch_page(page)
except requests.exceptions.RequestException as e:
logger.error("Stopping sync due to request error on page %s: %s", page, e)
break
except json.JSONDecodeError as e:
logger.error("Stopping sync due to JSON parsing error on page %s: %s", page, e)
break
if not isinstance(data, dict):
logger.error("Unexpected response type on page %s: %r", page, type(data))
break
items = data.get('items', [])
if not isinstance(items, list):
logger.error("Unexpected 'items' type on page %s: %r", page, type(items))
break
if not items:
break
for it in items:
if isinstance(it, dict) and it.get('download', False):
# download_asset handles its own exceptions and logs; no need to catch here
download_asset(it)
total += 1
page += 1
# Respectful pacing between pages
time.sleep(0.2)
print(f'Synced {total} items')
if __name__ == '__main__':
sync_all()
Exception Handling Rationale
Usage Suggestions
原始代码分析 识别到的主要异常风险点与类别:
为保证业务逻辑不变,本次增强仅增加显式校验与精准捕获,异常时给出清晰提示并优雅退出,不改变正常情况下的输出结构与计算流程。
增强后代码
import json
import sys
import statistics
from pathlib import Path
from json import JSONDecodeError
"""
读取JSON中的数值序列,计算均值、标准化与移动平均并输出。
风险点:文件不存在、JSON格式错误、空序列导致除零、窗口参数不合法、类型转换失败。
"""
def load_numbers(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(f"输入文件不存在: {path}") from e
except PermissionError as e:
raise PermissionError(f"无权限读取文件: {path}") from e
except JSONDecodeError as e:
raise ValueError(f"JSON解析失败: {path} (行{e.lineno} 列{e.colno}): {e.msg}") from e
except OSError as e:
raise OSError(f"读取文件失败: {path}: {e.strerror}") from e
if not isinstance(data, dict) or 'values' not in data:
raise ValueError("JSON中缺少'values'字段或格式不正确(应为顶层字典且包含'values'键)")
raw = data['values']
if not isinstance(raw, (list, tuple)):
raise ValueError(f"'values'应为数组(list/tuple),实际为: {type(raw).__name__}")
series = []
for idx, x in enumerate(raw):
try:
series.append(float(x))
except (ValueError, TypeError) as e:
raise ValueError(f"第{idx}个元素无法转换为浮点数: {x!r}") from e
return series
def normalize(series):
if not isinstance(series, (list, tuple)):
raise TypeError("normalize(series) 参数必须为list或tuple")
if len(series) == 0:
raise ValueError("空序列无法计算均值/标准差")
mean = statistics.mean(series)
stdev = statistics.pstdev(series)
if stdev == 0:
raise ZeroDivisionError("序列标准差为0,无法进行标准化(所有值相同)")
return [(x - mean) / stdev for x in series]
def moving_avg(series, window):
if not isinstance(window, int):
raise TypeError(f"窗口参数必须为整数,实际为: {type(window).__name__}")
if window <= 0:
raise ValueError(f"窗口参数必须为正整数,当前: {window}")
out = []
for i in range(len(series)):
if i + 1 < window:
out.append(None)
else:
seg = series[i + 1 - window:i + 1]
out.append(sum(seg) / window)
return out
def save_result(path, payload):
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
except FileNotFoundError as e:
raise FileNotFoundError(f"输出目录不存在或路径无效: {path}") from e
except PermissionError as e:
raise PermissionError(f"无权限写入文件: {path}") from e
except TypeError as e:
# 通常payload为基础类型可序列化,此处为防御性处理
raise TypeError(f"结果数据无法序列化为JSON: {e}") from e
except OSError as e:
raise OSError(f"写入文件失败: {path}: {e.strerror}") from e
def main():
try:
in_path = Path(sys.argv[1] if len(sys.argv) > 1 else 'input.json')
out_path = Path(sys.argv[2] if len(sys.argv) > 2 else 'result.json')
if len(sys.argv) > 3:
try:
window = int(sys.argv[3])
except ValueError as e:
print(f"错误: 无效的窗口参数 '{sys.argv[3]}',必须为正整数。", file=sys.stderr)
sys.exit(2)
else:
window = 5
series = load_numbers(in_path)
norm = normalize(series)
ma = moving_avg(series, window)
result = {
'count': len(series),
'mean': sum(series) / len(series),
'normalized': norm,
'moving_avg': ma
}
save_result(out_path, result)
print(f'Wrote {out_path} with {len(series)} values')
except (FileNotFoundError, PermissionError, OSError) as e:
print(f"文件/系统错误: {e}", file=sys.stderr)
sys.exit(1)
except (ValueError, TypeError, ZeroDivisionError) as e:
print(f"数据/参数错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
异常处理说明
使用建议
用于在代码评审、上线前走查与遗留系统加固等关键环节,一键为Python代码补齐专业级异常处理。通过系统识别风险点、分级处置与信息优化,显著降低线上故障与返工成本,缩短排障时间,提升用户可感知的稳定性与体验。输出包含风险清单、加固后的代码、处理思路与使用建议,形成从发现到修复的闭环,且不改变核心业务逻辑。适配数据处理、Web后端、自动化脚本等多种场景,帮助团队沉淀统一规范,快速复制最佳实践。
在发布前一键补强服务与任务的异常处理,规范错误提示与日志,减少线上故障与人工排查时间。
为ETL与数据清洗脚本自动添加分类捕获与降级策略,遇到脏数据不中断流程,生成可追踪的错误记录。
批量处理文件、网络下载、目录遍历等操作时,自动生成稳健的错误处理与重试建议,避免任务半途而废。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期