不止热门角色,我们为你扩展了更多细分角色分类,覆盖职场提升、商业增长、内容创作、学习规划等多元场景。精准匹配不同目标,让每一次生成都更有方向、更高命中率。
立即探索更多角色分类,找到属于你的增长加速器。
原始代码分析 识别到的潜在异常风险点与分类:
整体策略:
增强后代码
import os
import csv
import json
import sys
import logging
from datetime import datetime
"""
批量扫描目录下的CSV账目文件,合并并输出汇总JSON。
风险点:目录不存在、文件编码/格式异常、字段缺失、数值转换失败、写盘失败。
当前未做任何异常处理和日志记录。
"""
# 基础日志配置:INFO级别,时间+级别+消息;错误时包含堆栈
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s"
)
def scan_csvs(input_dir):
result = []
try:
names = os.listdir(input_dir)
except FileNotFoundError:
logging.error("输入目录不存在:%s", input_dir, exc_info=True)
raise
except NotADirectoryError:
logging.error("输入路径不是目录:%s", input_dir, exc_info=True)
raise
except PermissionError:
logging.error("没有访问目录权限:%s", input_dir, exc_info=True)
raise
except OSError as e:
logging.error("读取目录失败:%s(%s)", input_dir, e, exc_info=True)
raise
for name in names:
if name.lower().endswith('.csv'):
path = os.path.join(input_dir, name)
try:
# newline='' 以避免CSV换行兼容问题
with open(path, 'r', encoding='utf-8', newline='') as f:
try:
reader = csv.DictReader(f)
for row in reader:
row['source'] = name
if 'amount' in row:
try:
row['amount'] = float((row['amount'] or '').strip() or '0')
except (ValueError, TypeError):
# 非数字或格式异常时,按0处理并告警,但不中断处理
line_no = getattr(reader, "line_num", -1)
logging.warning(
"文件 %s 第 %s 行 amount 解析失败,已置为 0。原值=%r",
name, line_no, row.get('amount'), exc_info=False
)
row['amount'] = 0.0
result.append(row)
except csv.Error as e:
logging.error("解析CSV失败:文件=%s,错误=%s", path, e, exc_info=True)
# 跳过该文件,继续处理其他文件
continue
except FileNotFoundError:
logging.warning("扫描时文件不存在(可能被并发移动/删除):%s", path, exc_info=True)
continue
except PermissionError:
logging.error("没有读取文件权限:%s", path, exc_info=True)
continue
except UnicodeDecodeError:
logging.error("文件编码错误(非UTF-8或损坏):%s", path, exc_info=True)
continue
except OSError as e:
logging.error("读取文件失败:%s(%s)", path, e, exc_info=True)
continue
return result
def summarize(rows):
total = 0.0
by_cat = {}
for r in rows:
amt = r.get('amount', 0.0) or 0.0
try:
total += amt
except TypeError:
# 双重保护:若上游仍意外留下了非数值,置零并告警
logging.warning("记录 amount 类型异常,已按0处理:%r", amt, exc_info=False)
amt = 0.0
total += 0.0
cat = r.get('category') or 'unknown'
by_cat[cat] = by_cat.get(cat, 0.0) + (amt if isinstance(amt, (int, float)) else 0.0)
return {
'total': total,
'by_category': by_cat,
'count': len(rows)
}
def main():
input_dir = sys.argv[1] if len(sys.argv) > 1 else './data'
out_file = sys.argv[2] if len(sys.argv) > 2 else './output/summary.json'
try:
rows = scan_csvs(input_dir)
except (FileNotFoundError, NotADirectoryError, PermissionError, OSError):
# 致命错误:输入目录不可用
logging.error("终止:无法扫描输入目录 %s", input_dir)
sys.exit(1)
summary = summarize(rows)
payload = {
'generated_at': datetime.utcnow().isoformat(),
'summary': summary
}
# 处理输出目录为空字符串的情况(表示当前目录)
out_dir = os.path.dirname(out_file) or '.'
try:
os.makedirs(out_dir, exist_ok=True)
except PermissionError:
logging.error("无法创建输出目录(权限不足):%s", out_dir, exc_info=True)
sys.exit(2)
except OSError as e:
logging.error("创建输出目录失败:%s(%s)", out_dir, e, exc_info=True)
sys.exit(2)
try:
with open(out_file, 'w', encoding='utf-8') as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
except PermissionError:
logging.error("无法写入输出文件(权限不足):%s", out_file, exc_info=True)
sys.exit(3)
except FileNotFoundError:
logging.error("输出文件路径无效:%s", out_file, exc_info=True)
sys.exit(3)
except TypeError as e:
logging.error("JSON 序列化失败:%s", e, exc_info=True)
sys.exit(3)
except OSError as e:
logging.error("写入文件失败:%s(%s)", out_file, e, exc_info=True)
sys.exit(3)
print(f"Processed {len(rows)} rows, total={summary['total']}")
if __name__ == '__main__':
main()
异常处理说明
使用建议
Original Code Analysis Key exception risk points identified:
To ensure strict, precise exception handling without changing business logic, we add timeouts, targeted retries, explicit status checks, and thorough error handling with clear logs.
Enhanced Code
import os
import json
import time
import logging
from urllib.parse import urlparse
import requests
"""
从服务端分页拉取内容并按需下载附件。
风险点:无超时/重试、HTTP错误未检查、JSON解析失败、磁盘写入失败、环境变量缺失。
"""
# ---- Configuration for robust networking and logging ----
REQUEST_TIMEOUT = (3.05, 15) # (connect timeout, read timeout)
MAX_RETRIES = 3
BACKOFF_FACTOR = 0.6 # exponential backoff: 0.6, 1.2, 2.4...
# Basic logger setup (stdout/stderr)
logger = logging.getLogger(__name__)
if not logger.handlers:
handler = logging.StreamHandler()
formatter = logging.Formatter(
fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
BASE_URL = os.getenv('SERVICE_URL', 'https://api.example.com')
TOKEN = os.getenv('API_TOKEN', '')
def _validate_base_url(url: str) -> None:
parsed = urlparse(url)
if not parsed.scheme or not parsed.netloc:
logger.warning(
"BASE_URL %r appears invalid (missing scheme or host). "
"Please set SERVICE_URL like 'https://api.example.com'.",
url
)
def _sleep_backoff(attempt: int) -> None:
# attempt starts at 1
delay = BACKOFF_FACTOR * (2 ** (attempt - 1))
time.sleep(delay)
def _request_with_retry(url: str, *, headers=None, stream: bool = False, timeout=REQUEST_TIMEOUT) -> requests.Response:
"""
GET with strict error handling:
- Timeout and retry on transient errors (Timeout, ConnectionError)
- Retry on 5xx, 429, 408; do not retry on other 4xx
- Raise for non-2xx statuses
"""
headers = headers or {}
last_exc = None
for attempt in range(1, MAX_RETRIES + 1):
try:
resp = requests.get(url, headers=headers, stream=stream, timeout=timeout)
try:
resp.raise_for_status()
except requests.exceptions.HTTPError as http_err:
status = resp.status_code
# Retry server-side errors and known transient client-side codes
if status >= 500 or status in (408, 429):
logger.warning(
"HTTP %s on %s (attempt %d/%d); will retry. Detail: %s",
status, url, attempt, MAX_RETRIES, http_err
)
last_exc = http_err
else:
# Non-retryable client errors
logger.error("HTTP %s on %s; not retrying. Detail: %s", status, url, http_err)
raise
else:
return resp # success path
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError) as net_err:
logger.warning(
"Network error on %s (attempt %d/%d); will retry. Detail: %s",
url, attempt, MAX_RETRIES, net_err
)
last_exc = net_err
except requests.exceptions.RequestException as req_err:
# Other non-retryable request errors (InvalidURL, TooManyRedirects, etc.)
logger.error("Request error on %s; not retrying. Detail: %s", url, req_err)
raise
# Retry if not last attempt
if attempt < MAX_RETRIES:
_sleep_backoff(attempt)
# If we exhausted retries, raise the last exception
assert last_exc is not None
raise last_exc
def fetch_page(page=1):
url = f"{BASE_URL}/v1/items?page={page}"
headers = {'Authorization': f'Bearer {TOKEN}'} if TOKEN else {}
# Validate base URL once (non-fatal)
_validate_base_url(BASE_URL)
# Perform request with timeout, status check, and retries
resp = _request_with_retry(url, headers=headers, stream=False, timeout=REQUEST_TIMEOUT)
# JSON parsing with robust error reporting
try:
data = resp.json()
except json.JSONDecodeError as e:
# Try to include a small snippet to aid debugging (avoid huge logs)
snippet = ""
try:
snippet = resp.text[:200]
except Exception:
pass
logger.error(
"Failed to parse JSON for page=%s from %s. Error: %s. Body snippet: %r",
page, url, e, snippet
)
raise
return data
def download_asset(item, folder='./downloads'):
url = item.get('asset_url')
if not url:
logger.info("Item %r has no asset_url; skipping download.", item.get('id', '<unknown>'))
return None
# Ensure item has an 'id' for filename
item_id = item.get('id')
if item_id is None:
logger.warning("Missing 'id' in item; cannot derive filename. Skipping download.")
return None
# Prepare folder
try:
os.makedirs(folder, exist_ok=True)
except OSError as e:
logger.error("Failed to create folder %s: %s", folder, e)
return None
filename = os.path.join(folder, f"{item_id}.bin")
# Download with retry and streaming
try:
resp = _request_with_retry(url, headers={}, stream=True, timeout=REQUEST_TIMEOUT)
except requests.exceptions.RequestException as e:
logger.error("Failed to start download for item %s from %s: %s", item_id, url, e)
return None
# Stream to disk safely; clean up partial files on failure
try:
with open(filename, 'wb') as f:
for chunk in resp.iter_content(chunk_size=8192):
if chunk: # filters keep-alive chunks
f.write(chunk)
except requests.exceptions.RequestException as e:
logger.error("Network error while streaming item %s from %s: %s", item_id, url, e)
try:
if os.path.exists(filename):
os.remove(filename)
except OSError:
pass
return None
except OSError as e:
logger.error("File write error for %s: %s", filename, e)
try:
if os.path.exists(filename):
os.remove(filename)
except OSError:
pass
return None
finally:
try:
resp.close()
except Exception:
pass
return filename
def sync_all():
page = 1
total = 0
while True:
try:
data = fetch_page(page)
except requests.exceptions.RequestException as e:
logger.error("Stopping sync due to request error on page %s: %s", page, e)
break
except json.JSONDecodeError as e:
logger.error("Stopping sync due to JSON parsing error on page %s: %s", page, e)
break
if not isinstance(data, dict):
logger.error("Unexpected response type on page %s: %r", page, type(data))
break
items = data.get('items', [])
if not isinstance(items, list):
logger.error("Unexpected 'items' type on page %s: %r", page, type(items))
break
if not items:
break
for it in items:
if isinstance(it, dict) and it.get('download', False):
# download_asset handles its own exceptions and logs; no need to catch here
download_asset(it)
total += 1
page += 1
# Respectful pacing between pages
time.sleep(0.2)
print(f'Synced {total} items')
if __name__ == '__main__':
sync_all()
Exception Handling Rationale
Usage Suggestions
原始代码分析 识别到的主要异常风险点与类别:
为保证业务逻辑不变,本次增强仅增加显式校验与精准捕获,异常时给出清晰提示并优雅退出,不改变正常情况下的输出结构与计算流程。
增强后代码
import json
import sys
import statistics
from pathlib import Path
from json import JSONDecodeError
"""
读取JSON中的数值序列,计算均值、标准化与移动平均并输出。
风险点:文件不存在、JSON格式错误、空序列导致除零、窗口参数不合法、类型转换失败。
"""
def load_numbers(path):
try:
with open(path, 'r', encoding='utf-8') as f:
data = json.load(f)
except FileNotFoundError as e:
raise FileNotFoundError(f"输入文件不存在: {path}") from e
except PermissionError as e:
raise PermissionError(f"无权限读取文件: {path}") from e
except JSONDecodeError as e:
raise ValueError(f"JSON解析失败: {path} (行{e.lineno} 列{e.colno}): {e.msg}") from e
except OSError as e:
raise OSError(f"读取文件失败: {path}: {e.strerror}") from e
if not isinstance(data, dict) or 'values' not in data:
raise ValueError("JSON中缺少'values'字段或格式不正确(应为顶层字典且包含'values'键)")
raw = data['values']
if not isinstance(raw, (list, tuple)):
raise ValueError(f"'values'应为数组(list/tuple),实际为: {type(raw).__name__}")
series = []
for idx, x in enumerate(raw):
try:
series.append(float(x))
except (ValueError, TypeError) as e:
raise ValueError(f"第{idx}个元素无法转换为浮点数: {x!r}") from e
return series
def normalize(series):
if not isinstance(series, (list, tuple)):
raise TypeError("normalize(series) 参数必须为list或tuple")
if len(series) == 0:
raise ValueError("空序列无法计算均值/标准差")
mean = statistics.mean(series)
stdev = statistics.pstdev(series)
if stdev == 0:
raise ZeroDivisionError("序列标准差为0,无法进行标准化(所有值相同)")
return [(x - mean) / stdev for x in series]
def moving_avg(series, window):
if not isinstance(window, int):
raise TypeError(f"窗口参数必须为整数,实际为: {type(window).__name__}")
if window <= 0:
raise ValueError(f"窗口参数必须为正整数,当前: {window}")
out = []
for i in range(len(series)):
if i + 1 < window:
out.append(None)
else:
seg = series[i + 1 - window:i + 1]
out.append(sum(seg) / window)
return out
def save_result(path, payload):
try:
with open(path, 'w', encoding='utf-8') as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
except FileNotFoundError as e:
raise FileNotFoundError(f"输出目录不存在或路径无效: {path}") from e
except PermissionError as e:
raise PermissionError(f"无权限写入文件: {path}") from e
except TypeError as e:
# 通常payload为基础类型可序列化,此处为防御性处理
raise TypeError(f"结果数据无法序列化为JSON: {e}") from e
except OSError as e:
raise OSError(f"写入文件失败: {path}: {e.strerror}") from e
def main():
try:
in_path = Path(sys.argv[1] if len(sys.argv) > 1 else 'input.json')
out_path = Path(sys.argv[2] if len(sys.argv) > 2 else 'result.json')
if len(sys.argv) > 3:
try:
window = int(sys.argv[3])
except ValueError as e:
print(f"错误: 无效的窗口参数 '{sys.argv[3]}',必须为正整数。", file=sys.stderr)
sys.exit(2)
else:
window = 5
series = load_numbers(in_path)
norm = normalize(series)
ma = moving_avg(series, window)
result = {
'count': len(series),
'mean': sum(series) / len(series),
'normalized': norm,
'moving_avg': ma
}
save_result(out_path, result)
print(f'Wrote {out_path} with {len(series)} values')
except (FileNotFoundError, PermissionError, OSError) as e:
print(f"文件/系统错误: {e}", file=sys.stderr)
sys.exit(1)
except (ValueError, TypeError, ZeroDivisionError) as e:
print(f"数据/参数错误: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()
异常处理说明
使用建议
用于在代码评审、上线前走查与遗留系统加固等关键环节,一键为Python代码补齐专业级异常处理。通过系统识别风险点、分级处置与信息优化,显著降低线上故障与返工成本,缩短排障时间,提升用户可感知的稳定性与体验。输出包含风险清单、加固后的代码、处理思路与使用建议,形成从发现到修复的闭环,且不改变核心业务逻辑。适配数据处理、Web后端、自动化脚本等多种场景,帮助团队沉淀统一规范,快速复制最佳实践。