热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词专为Python开发场景设计,能够智能分析代码结构并自动添加规范的日志记录功能。通过系统化的日志级别配置、异常捕获和性能监控,帮助开发者快速构建可维护的应用程序。支持自定义日志格式和输出目标,确保代码在生产环境中具备完整的可观测性,同时保持代码的整洁性和专业性。适用于Web开发、数据处理、自动化脚本等多种Python应用场景。
代码修改说明
增强后的完整代码
import csv
import json
import logging
import time
from pathlib import Path
from statistics import mean
logger = logging.getLogger(__name__)
def configure_logging(log_file: str = "logs/app.log", level: int = logging.DEBUG) -> None:
"""
基础文件日志配置:DEBUG 级别、详细格式、仅文件输出。
"""
# 避免重复添加 handler(例如被多次调用或被导入执行)
root = logging.getLogger()
if root.handlers:
return
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
fmt = (
"%(asctime)s | %(levelname)s | pid=%(process)d tid=%(thread)d | "
"%(name)s:%(funcName)s:%(lineno)d | %(message)s"
)
datefmt = "%Y-%m-%d %H:%M:%S"
file_handler = logging.FileHandler(log_file, encoding="utf-8")
file_handler.setLevel(level)
file_handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))
root.setLevel(level)
root.addHandler(file_handler)
def _elapsed_ms(start: float) -> float:
return (time.perf_counter() - start) * 1000.0
def read_csv(path):
t0 = time.perf_counter()
logger.debug("Reading CSV file: %s", path)
rows = []
try:
with open(path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append(row)
logger.info("Loaded rows: %d (%.2f ms)", len(rows), _elapsed_ms(t0))
return rows
except Exception:
logger.exception("Failed to read CSV: %s (%.2f ms)", path, _elapsed_ms(t0))
raise
def transform(rows):
t0 = time.perf_counter()
logger.debug("Start transform with %d rows", len(rows))
converted = []
error_count = 0
for r in rows:
try:
amount = float((r.get("amount", "0").strip()) or 0)
category = (r.get("category") or "unknown").strip().lower()
converted.append({"category": category, "amount": amount})
except Exception:
# 仅记录必要字段,避免潜在敏感信息泄漏
sample = {"category": r.get("category"), "amount": r.get("amount")}
logger.warning("Bad row skipped: %s", sample, exc_info=True)
error_count += 1
logger.info(
"Transform done: in=%d out=%d errors=%d (%.2f ms)",
len(rows),
len(converted),
error_count,
_elapsed_ms(t0),
)
return converted
def aggregate(rows):
t0 = time.perf_counter()
logger.debug("Start aggregate with %d rows", len(rows))
sums = {}
for r in rows:
sums.setdefault(r["category"], []).append(r["amount"])
result = [
{"category": k, "total": sum(v), "avg": round(mean(v), 2)}
for k, v in sums.items()
]
preview = result if len(result) <= 5 else result[:5]
logger.info(
"Aggregate done: categories=%d out=%d (%.2f ms) preview=%s%s",
len(sums),
len(result),
_elapsed_ms(t0),
preview,
" ... (truncated)" if len(result) > 5 else "",
)
return result
def write_json(data, out_path):
t0 = time.perf_counter()
logger.debug("Writing JSON file: %s", out_path)
try:
out_path = Path(out_path)
out_path.parent.mkdir(parents=True, exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
size = out_path.stat().st_size if out_path.exists() else 0
items = len(data) if isinstance(data, list) else 1
logger.info(
"Wrote JSON: items=%d size=%d bytes path=%s (%.2f ms)",
items,
size,
str(out_path),
_elapsed_ms(t0),
)
except Exception:
logger.exception("Failed to write JSON: %s (%.2f ms)", out_path, _elapsed_ms(t0))
raise
def main():
configure_logging(log_file="logs/app.log", level=logging.DEBUG)
pipeline_t0 = time.perf_counter()
src = "data/sales.csv"
dst = "build/sales_summary.json"
logger.info("Pipeline started: src=%s dst=%s", src, dst)
step_t0 = time.perf_counter()
rows = read_csv(src)
logger.debug("Step read_csv took %.2f ms", _elapsed_ms(step_t0))
step_t0 = time.perf_counter()
rows = transform(rows)
logger.debug("Step transform took %.2f ms", _elapsed_ms(step_t0))
step_t0 = time.perf_counter()
report = aggregate(rows)
logger.debug("Step aggregate took %.2f ms", _elapsed_ms(step_t0))
step_t0 = time.perf_counter()
write_json(report, dst)
logger.debug("Step write_json took %.2f ms", _elapsed_ms(step_t0))
logger.info("Pipeline finished in %.2f ms", _elapsed_ms(pipeline_t0))
if __name__ == "__main__":
main()
日志配置示例
import logging
import logging.config
from pathlib import Path
def setup_logging_dictconfig(log_file="logs/app.log", level="DEBUG"):
Path(log_file).parent.mkdir(parents=True, exist_ok=True)
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"detailed": {
"format": "%(asctime)s | %(levelname)s | pid=%(process)d tid=%(thread)d | "
"%(name)s:%(funcName)s:%(lineno)d | %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
}
},
"handlers": {
"file": {
"class": "logging.FileHandler",
"level": level,
"formatter": "detailed",
"filename": log_file,
"encoding": "utf-8",
}
},
"root": {"level": level, "handlers": ["file"]},
}
)
使用建议
import argparse
import shutil
from pathlib import Path
import time
import logging
logger = logging.getLogger(__name__)
def setup_logging(level=logging.WARNING):
"""Configure console logging with a standard format."""
root = logging.getLogger()
if root.handlers:
# Avoid duplicate handlers if setup is called multiple times
return
handler = logging.StreamHandler()
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s"
)
handler.setFormatter(formatter)
root.addHandler(handler)
root.setLevel(level)
def scan_and_archive(src, archive):
logger.info("Scanning %s ...", src)
src_p = Path(src)
archive_p = Path(archive)
archive_p.mkdir(parents=True, exist_ok=True)
processed = 0
for p in src_p.glob('*.log'):
try:
dest = archive_p / f'{p.stem}_{int(time.time())}.log'
shutil.copy2(p, dest)
logger.debug("Archived %s -> %s", p.name, dest.name)
processed += 1
except Exception:
# Log full stack for diagnostics; paths are non-sensitive
logger.exception("Archive error for %s", p)
logger.info("Archived %d files from %s to %s", processed, src, archive)
def cli():
parser = argparse.ArgumentParser(description='archive recent log files')
parser.add_argument('--source', required=True, help='directory of logs')
parser.add_argument('--archive', required=True, help='archive directory')
parser.add_argument('--interval', type=int, default=60, help='scan interval seconds')
parser.add_argument('--once', action='store_true', help='run once then exit')
args = parser.parse_args()
logger.info("Starting archiver with args: %s", args)
while True:
scan_and_archive(args.source, args.archive)
if args.once:
break
time.sleep(args.interval)
if __name__ == '__main__':
# Default to WARNING level per requirement; console output only
setup_logging(logging.WARNING)
cli()
import logging
def setup_logging(level=logging.WARNING):
handler = logging.StreamHandler() # console
formatter = logging.Formatter(
fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s"
)
handler.setFormatter(formatter)
root = logging.getLogger()
if not root.handlers:
root.addHandler(handler)
root.setLevel(level)
# 使用示例
# setup_logging(logging.WARNING) # 默认,仅输出 WARNING/ERROR/CRITICAL
# setup_logging(logging.INFO) # 想查看扫描与汇总信息时使用
# setup_logging(logging.DEBUG) # 排查问题时查看每个文件的归档明细
用一次对话,让任意 Python 项目快速拥有生产级日志能力。通过自动识别关键路径与异常点、按规范插入日志、统一输出格式与目标,帮助研发团队在几分钟内完成从零到一或从混乱到标准的升级。适用于 Web 服务、数据任务与自动化脚本,既保留原有业务逻辑不变,又显著提升可观测性与可维护性。
为接口与服务补齐请求链路、状态与异常日志,定位慢请求与超时调用,发布当天即可缩短排查时间。
为批处理、数据管道与校验流程自动记录阶段进度、输入输出量与失败详情,支持断点重跑与追踪来源。
为定时与临时脚本生成统一日志,异常含参数与环境信息,可输出到文件或平台,便于告警与溯源。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期