¥
立即购买

Python代码日志增强专家

0 浏览
0 试用
0 购买
Dec 10, 2025更新

本提示词专为Python开发场景设计,能够智能分析代码结构并自动添加规范的日志记录功能。通过系统化的日志级别配置、异常捕获和性能监控,帮助开发者快速构建可维护的应用程序。支持自定义日志格式和输出目标,确保代码在生产环境中具备完整的可观测性,同时保持代码的整洁性和专业性。适用于Web开发、数据处理、自动化脚本等多种Python应用场景。

代码修改说明

  • 用标准库 logging 替换所有 print,统一输出到文件,日志级别默认 DEBUG。
  • 为关键路径添加性能监控,记录每个阶段的耗时(毫秒):read_csv、transform、aggregate、write_json 以及整体流水线。
  • 在可能失败的I/O操作(读CSV、写JSON)中加入异常日志(logger.exception)并继续抛出,保证行为与原逻辑一致但可观测性更强。
  • 对数据质量问题(transform 时的坏行)记录 WARNING 级别日志,并附带简化后的行内容与异常栈,避免敏感信息泄露。
  • 统一详细(detailed)日志格式,包含时间、级别、进程/线程、模块、函数、行号等上下文信息。
  • 保持原有业务逻辑和返回值不变,日志记录避免额外的性能开销(采用惰性格式化、限制大对象输出预览)。

增强后的完整代码

import csv
import json
import logging
import time
from pathlib import Path
from statistics import mean


logger = logging.getLogger(__name__)


def configure_logging(log_file: str = "logs/app.log", level: int = logging.DEBUG) -> None:
    """
    基础文件日志配置:DEBUG 级别、详细格式、仅文件输出。
    """
    # 避免重复添加 handler(例如被多次调用或被导入执行)
    root = logging.getLogger()
    if root.handlers:
        return

    Path(log_file).parent.mkdir(parents=True, exist_ok=True)

    fmt = (
        "%(asctime)s | %(levelname)s | pid=%(process)d tid=%(thread)d | "
        "%(name)s:%(funcName)s:%(lineno)d | %(message)s"
    )
    datefmt = "%Y-%m-%d %H:%M:%S"

    file_handler = logging.FileHandler(log_file, encoding="utf-8")
    file_handler.setLevel(level)
    file_handler.setFormatter(logging.Formatter(fmt=fmt, datefmt=datefmt))

    root.setLevel(level)
    root.addHandler(file_handler)


def _elapsed_ms(start: float) -> float:
    return (time.perf_counter() - start) * 1000.0


def read_csv(path):
    t0 = time.perf_counter()
    logger.debug("Reading CSV file: %s", path)
    rows = []
    try:
        with open(path, newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                rows.append(row)
        logger.info("Loaded rows: %d (%.2f ms)", len(rows), _elapsed_ms(t0))
        return rows
    except Exception:
        logger.exception("Failed to read CSV: %s (%.2f ms)", path, _elapsed_ms(t0))
        raise


def transform(rows):
    t0 = time.perf_counter()
    logger.debug("Start transform with %d rows", len(rows))
    converted = []
    error_count = 0
    for r in rows:
        try:
            amount = float((r.get("amount", "0").strip()) or 0)
            category = (r.get("category") or "unknown").strip().lower()
            converted.append({"category": category, "amount": amount})
        except Exception:
            # 仅记录必要字段,避免潜在敏感信息泄漏
            sample = {"category": r.get("category"), "amount": r.get("amount")}
            logger.warning("Bad row skipped: %s", sample, exc_info=True)
            error_count += 1
    logger.info(
        "Transform done: in=%d out=%d errors=%d (%.2f ms)",
        len(rows),
        len(converted),
        error_count,
        _elapsed_ms(t0),
    )
    return converted


def aggregate(rows):
    t0 = time.perf_counter()
    logger.debug("Start aggregate with %d rows", len(rows))
    sums = {}
    for r in rows:
        sums.setdefault(r["category"], []).append(r["amount"])
    result = [
        {"category": k, "total": sum(v), "avg": round(mean(v), 2)}
        for k, v in sums.items()
    ]
    preview = result if len(result) <= 5 else result[:5]
    logger.info(
        "Aggregate done: categories=%d out=%d (%.2f ms) preview=%s%s",
        len(sums),
        len(result),
        _elapsed_ms(t0),
        preview,
        " ... (truncated)" if len(result) > 5 else "",
    )
    return result


def write_json(data, out_path):
    t0 = time.perf_counter()
    logger.debug("Writing JSON file: %s", out_path)
    try:
        out_path = Path(out_path)
        out_path.parent.mkdir(parents=True, exist_ok=True)
        with open(out_path, "w", encoding="utf-8") as f:
            json.dump(data, f, ensure_ascii=False, indent=2)
        size = out_path.stat().st_size if out_path.exists() else 0
        items = len(data) if isinstance(data, list) else 1
        logger.info(
            "Wrote JSON: items=%d size=%d bytes path=%s (%.2f ms)",
            items,
            size,
            str(out_path),
            _elapsed_ms(t0),
        )
    except Exception:
        logger.exception("Failed to write JSON: %s (%.2f ms)", out_path, _elapsed_ms(t0))
        raise


def main():
    configure_logging(log_file="logs/app.log", level=logging.DEBUG)

    pipeline_t0 = time.perf_counter()
    src = "data/sales.csv"
    dst = "build/sales_summary.json"
    logger.info("Pipeline started: src=%s dst=%s", src, dst)

    step_t0 = time.perf_counter()
    rows = read_csv(src)
    logger.debug("Step read_csv took %.2f ms", _elapsed_ms(step_t0))

    step_t0 = time.perf_counter()
    rows = transform(rows)
    logger.debug("Step transform took %.2f ms", _elapsed_ms(step_t0))

    step_t0 = time.perf_counter()
    report = aggregate(rows)
    logger.debug("Step aggregate took %.2f ms", _elapsed_ms(step_t0))

    step_t0 = time.perf_counter()
    write_json(report, dst)
    logger.debug("Step write_json took %.2f ms", _elapsed_ms(step_t0))

    logger.info("Pipeline finished in %.2f ms", _elapsed_ms(pipeline_t0))


if __name__ == "__main__":
    main()

日志配置示例

  • 基于 dictConfig 的“详细(detailed)+ 文件输出 + DEBUG 级别”示例,可直接替换 configure_logging 使用:
import logging
import logging.config
from pathlib import Path

def setup_logging_dictconfig(log_file="logs/app.log", level="DEBUG"):
    Path(log_file).parent.mkdir(parents=True, exist_ok=True)
    logging.config.dictConfig(
        {
            "version": 1,
            "disable_existing_loggers": False,
            "formatters": {
                "detailed": {
                    "format": "%(asctime)s | %(levelname)s | pid=%(process)d tid=%(thread)d | "
                              "%(name)s:%(funcName)s:%(lineno)d | %(message)s",
                    "datefmt": "%Y-%m-%d %H:%M:%S",
                }
            },
            "handlers": {
                "file": {
                    "class": "logging.FileHandler",
                    "level": level,
                    "formatter": "detailed",
                    "filename": log_file,
                    "encoding": "utf-8",
                }
            },
            "root": {"level": level, "handlers": ["file"]},
        }
    )

使用建议

  • 日志级别调整
    • 开发/调试:保持 DEBUG,获得完整的性能与数据质量信息。
    • 生产:建议切换为 INFO 或 WARNING,减少日志量;仅在故障排查时临时开启 DEBUG。
  • 日志轮转
    • 当前示例使用 FileHandler。生产环境建议改用 RotatingFileHandler 或 TimedRotatingFileHandler,并配置最大大小/保留个数,避免磁盘占满。
  • 性能监控
    • 已对每个阶段和整体耗时进行记录。若耗时过长,可进一步在 transform 内部按批次计时,或为特定大数据集引入采样日志。
  • 数据与隐私
    • 对异常行仅输出必要字段(category、amount)。请避免在日志中写入密码、密钥或个人隐私信息。
  • 可扩展性
    • 若日志过于冗长,可对聚合结果和中间结果仅输出前 N 条预览,或通过环境变量控制预览长度与是否输出数据样本。

代码修改说明

  • 引入标准库 logging,替换原有的 print 输出,统一输出到控制台(console)。
  • 采用分级日志策略:
    • INFO:扫描开始、归档汇总信息
    • DEBUG:单个文件归档成功明细(默认 WARNING 级别下不会输出,避免噪声)
    • ERROR:归档失败错误(包含堆栈,便于定位)
  • 提供 setup_logging 函数,配置标准日志格式(时间戳、级别、模块名、消息),默认级别为 WARNING。
  • 保持原有业务逻辑不变,未引入性能监控代码。

增强后的完整代码

import argparse
import shutil
from pathlib import Path
import time
import logging

logger = logging.getLogger(__name__)


def setup_logging(level=logging.WARNING):
    """Configure console logging with a standard format."""
    root = logging.getLogger()
    if root.handlers:
        # Avoid duplicate handlers if setup is called multiple times
        return
    handler = logging.StreamHandler()
    formatter = logging.Formatter(
        fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s"
    )
    handler.setFormatter(formatter)
    root.addHandler(handler)
    root.setLevel(level)


def scan_and_archive(src, archive):
    logger.info("Scanning %s ...", src)

    src_p = Path(src)
    archive_p = Path(archive)
    archive_p.mkdir(parents=True, exist_ok=True)

    processed = 0
    for p in src_p.glob('*.log'):
        try:
            dest = archive_p / f'{p.stem}_{int(time.time())}.log'
            shutil.copy2(p, dest)
            logger.debug("Archived %s -> %s", p.name, dest.name)
            processed += 1
        except Exception:
            # Log full stack for diagnostics; paths are non-sensitive
            logger.exception("Archive error for %s", p)
    logger.info("Archived %d files from %s to %s", processed, src, archive)


def cli():
    parser = argparse.ArgumentParser(description='archive recent log files')
    parser.add_argument('--source', required=True, help='directory of logs')
    parser.add_argument('--archive', required=True, help='archive directory')
    parser.add_argument('--interval', type=int, default=60, help='scan interval seconds')
    parser.add_argument('--once', action='store_true', help='run once then exit')
    args = parser.parse_args()

    logger.info("Starting archiver with args: %s", args)

    while True:
        scan_and_archive(args.source, args.archive)
        if args.once:
            break
        time.sleep(args.interval)


if __name__ == '__main__':
    # Default to WARNING level per requirement; console output only
    setup_logging(logging.WARNING)
    cli()

日志配置示例

import logging

def setup_logging(level=logging.WARNING):
    handler = logging.StreamHandler()  # console
    formatter = logging.Formatter(
        fmt="%(asctime)s %(levelname)s [%(name)s] %(message)s"
    )
    handler.setFormatter(formatter)

    root = logging.getLogger()
    if not root.handlers:
        root.addHandler(handler)
    root.setLevel(level)

# 使用示例
# setup_logging(logging.WARNING)  # 默认,仅输出 WARNING/ERROR/CRITICAL
# setup_logging(logging.INFO)     # 想查看扫描与汇总信息时使用
# setup_logging(logging.DEBUG)    # 排查问题时查看每个文件的归档明细

使用建议

  • 默认级别为 WARNING:仅输出错误等重要信息,适合生产环境。
  • 如果需要查看运行过程:
    • 设置为 INFO:会看到每轮扫描开始与归档汇总数量。
    • 设置为 DEBUG:会看到每个文件的归档成功明细(可能较多,慎用于高频或大目录环境)。
  • 异常日志使用 logger.exception,包含堆栈,便于快速定位问题。
  • 请勿在日志中记录敏感信息(如密码、密钥等);当前日志仅包含路径与文件名。

示例详情

解决的问题

用一次对话,让任意 Python 项目快速拥有生产级日志能力。通过自动识别关键路径与异常点、按规范插入日志、统一输出格式与目标,帮助研发团队在几分钟内完成从零到一或从混乱到标准的升级。适用于 Web 服务、数据任务与自动化脚本,既保留原有业务逻辑不变,又显著提升可观测性与可维护性。

  • 让新旧项目在极短时间内具备清晰、可追踪、可审计的运行记录
  • 将零散的 print 升级为结构化、分级可控的日志策略
  • 按需开启性能监控,定位慢点与瓶颈更高效
  • 自定义日志级别、格式与输出目标,满足不同环境与合规要求
  • 产出修改说明、增强后完整代码、配置示例与使用建议,一次交付即可落地 试用路径:贴上你的代码与偏好(级别、输出、格式、是否监控),即可获得可直接替换的增强版本;升级使用可扩展到更多模块与团队协作场景,持续提升交付与运维效率。

适用用户

Python后端工程师

为接口与服务补齐请求链路、状态与异常日志,定位慢请求与超时调用,发布当天即可缩短排查时间。

数据工程师

为批处理、数据管道与校验流程自动记录阶段进度、输入输出量与失败详情,支持断点重跑与追踪来源。

自动化脚本作者/运维

为定时与临时脚本生成统一日志,异常含参数与环境信息,可输出到文件或平台,便于告警与溯源。

特征总结

一键为Python代码植入规范日志,自动识别关键路径与异常点,快速定位问题
按场景智能分配日志级别,默认即合理,避免刷屏与漏报,线上线下都清晰
自动为风险位置补齐错误日志,带参数与上下文,现场复现更轻松高效
可选开启性能监控,记录耗时与瓶颈环节,为加速与扩容提供直接依据
支持自定义日志格式与输出目标,一键切换到控制台、文件或第三方平台
保持业务逻辑零侵入,遵循编码规范整理输出,让代码更整洁、易维护
附带可复制配置与实践建议,按需调整级别和字段,迅速融入现有项目流程
覆盖Web、数据处理与自动化脚本等场景,统一链路记录,提升团队协作排障效率
模板化参数输入:粘贴代码、选择级别与格式,立即生成增强版本,开箱即用

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 572 tokens
- 5 个可调节参数
{ Python代码 } { 日志级别 } { 输出目标 } { 日志格式 } { 性能监控 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59