编程助手专家

316 浏览
38 试用
8 购买
Oct 29, 2025更新

提供编程指导与最佳实践,辅助解决常见问题

下面是一个可直接运行的 Python3 CLI 脚本 filter_orders.py,满足你的需求。将其保存为 filter_orders.py 后即可使用示例命令执行。

#!/usr/bin/env python3 """ Filter orders CSV by status, min amount, and date range.

Reads a UTF-8 CSV (with header) containing fields: order_id, user_id, amount, status, currency, created_at created_at may be ISO8601 or "YYYY-MM-DD HH:MM:SS".

Features:

  • Stream processing using built-in csv and datetime (no third-party libs).
  • Filters:
    • status in --status (comma separated list)
    • amount >= --min-amount
    • created_at in [--start-date, --end-date] closed interval
  • I/O via CLI args:
    • --input input path (required)
    • --output output path (required)
    • --delimiter (default ',')
    • --keep-columns (comma separated; default: keep all input columns)
  • Skips rows with invalid amount/created_at and counts skip reasons.
  • Output keeps header; formats amount to two decimals; uppercases currency.
  • Prints stats to stderr at the end.

Usage example: python filter_orders.py --input orders.csv --output filtered.csv --min-amount 100 --status paid,shipped --start-date 2024-01-01 --end-date 2024-12-31 --keep-columns order_id,user_id,amount,created_at """

import argparse import csv import sys from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from datetime import datetime, date, time, timezone

REQUIRED_FIELDS = ["order_id", "user_id", "amount", "status", "currency", "created_at"]

def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="filter_orders.py", description="Filter orders CSV by status, minimum amount, and date range." ) parser.add_argument("--input", required=True, help="Input CSV file path (UTF-8).") parser.add_argument("--output", required=True, help="Output CSV file path (UTF-8).") parser.add_argument("--delimiter", default=",", help="CSV delimiter. Default: ','") parser.add_argument("--status", help="Comma-separated status values to include (case-insensitive).") parser.add_argument("--min-amount", help="Minimum amount (float/decimal) for filtering. Example: 100.0") parser.add_argument("--start-date", help="Start date/time (ISO8601 or 'YYYY-MM-DD HH:MM:SS'). Inclusive.") parser.add_argument("--end-date", help="End date/time (ISO8601 or 'YYYY-MM-DD HH:MM:SS'). Inclusive.") parser.add_argument("--keep-columns", help="Comma-separated columns to keep in output. Default: keep all.") return parser.parse_args()

def parse_status_list(s: str): if not s: return None return {item.strip().lower() for item in s.split(",") if item.strip()}

def parse_decimal(s: str) -> Decimal: return Decimal(s)

def quantize_2dp(d: Decimal) -> Decimal: return d.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)

def _fromisoformat_loose(dt_str: str) -> datetime: """ A slightly more forgiving ISO8601 parser: - Supports trailing 'Z' by converting to '+00:00' - Supports date-only strings (returns midnight) - Supports 'YYYY-MM-DD HH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS[±HH:MM]' """ s = dt_str.strip() if not s: raise ValueError("Empty datetime string")

# Handle trailing 'Z' (UTC)
if s.endswith("Z"):
    s = s[:-1] + "+00:00"

# Try datetime.fromisoformat directly
try:
    dt = datetime.fromisoformat(s)
    return dt
except ValueError:
    # Try date-only
    try:
        d = date.fromisoformat(s)
        return datetime.combine(d, time())
    except ValueError:
        # Last resort: try replacing ' ' with 'T'
        if " " in s:
            try:
                dt = datetime.fromisoformat(s.replace(" ", "T"))
                return dt
            except ValueError:
                pass
        raise

def normalize_to_naive_utc(dt: datetime) -> datetime: """ Normalize datetime to a naive UTC datetime (tz removed) for consistent comparisons. If dt has tzinfo, convert to UTC and strip tz; otherwise return as-is. """ if dt.tzinfo is not None: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt

def parse_bound_datetime(s: str, is_end: bool) -> datetime: """ Parse start/end bound. If date-only: - start: set to 00:00:00 - end: set to 23:59:59.999999 Normalize timezone-aware times to naive UTC. """ dt = _fromisoformat_loose(s) # If user passed a pure date, ensure appropriate time component if dt.time() == time() and s.strip()[:10] == s.strip(): # likely pure date if is_end: dt = dt.replace(hour=23, minute=59, second=59, microsecond=999999) else: dt = dt.replace(hour=0, minute=0, second=0, microsecond=0) return normalize_to_naive_utc(dt)

def main(): args = parse_args()

status_set = parse_status_list(args.status)

min_amount_dec = None
if args.min_amount is not None:
    try:
        # Use Decimal for robust comparison/formatting
        min_amount_dec = parse_decimal(args.min_amount)
    except InvalidOperation:
        sys.stderr.write(f"Error: --min-amount '{args.min_amount}' is not a valid number.\n")
        sys.exit(2)

start_dt = None
end_dt = None
try:
    if args.start_date:
        start_dt = parse_bound_datetime(args.start_date, is_end=False)
    if args.end_date:
        end_dt = parse_bound_datetime(args.end_date, is_end=True)
except ValueError as e:
    sys.stderr.write(f"Error: Invalid date/time bound: {e}\n")
    sys.exit(2)

# Open input/output in streaming mode
try:
    infile = open(args.input, "r", encoding="utf-8-sig", newline="")
except OSError as e:
    sys.stderr.write(f"Error: Cannot open input file: {e}\n")
    sys.exit(2)

try:
    outfile = open(args.output, "w", encoding="utf-8", newline="")
except OSError as e:
    infile.close()
    sys.stderr.write(f"Error: Cannot open output file: {e}\n")
    sys.exit(2)

with infile, outfile:
    reader = csv.DictReader(infile, delimiter=args.delimiter)
    if reader.fieldnames is None:
        sys.stderr.write("Error: Input CSV appears to have no header.\n")
        sys.exit(2)

    # Ensure required fields exist
    missing_required = [f for f in REQUIRED_FIELDS if f not in reader.fieldnames]
    if missing_required:
        sys.stderr.write(
            "Error: Input CSV missing required columns: "
            + ", ".join(missing_required) + "\n"
        )
        sys.exit(2)

    # Determine output columns
    if args.keep_columns:
        keep_cols = [c.strip() for c in args.keep_columns.split(",") if c.strip()]
        missing_keep = [c for c in keep_cols if c not in reader.fieldnames]
        if missing_keep:
            sys.stderr.write(
                "Error: --keep-columns includes columns not found in input: "
                + ", ".join(missing_keep) + "\n"
            )
            sys.exit(2)
        out_fields = keep_cols
    else:
        out_fields = reader.fieldnames

    writer = csv.DictWriter(outfile, fieldnames=out_fields, delimiter=args.delimiter)
    writer.writeheader()

    # Stats
    total_rows = 0
    matched_rows = 0
    skipped_invalid_rows = 0
    invalid_amount_count = 0
    invalid_created_at_count = 0

    # Stream rows
    for row in reader:
        total_rows += 1

        # Validate amount
        raw_amount = (row.get("amount") or "").strip()
        try:
            amount_dec = parse_decimal(raw_amount)
        except (InvalidOperation, AttributeError):
            skipped_invalid_rows += 1
            invalid_amount_count += 1
            continue

        # Validate created_at
        raw_created = (row.get("created_at") or "").strip()
        try:
            created_dt = _fromisoformat_loose(raw_created)
            created_dt = normalize_to_naive_utc(created_dt)
        except ValueError:
            skipped_invalid_rows += 1
            invalid_created_at_count += 1
            continue

        # Apply filters
        if status_set is not None:
            row_status = (row.get("status") or "").strip().lower()
            if row_status not in status_set:
                continue

        if min_amount_dec is not None and amount_dec < min_amount_dec:
            continue

        if start_dt is not None and created_dt < start_dt:
            continue

        if end_dt is not None and created_dt > end_dt:
            continue

        # Prepare output row with formatting
        out_row = {}
        for col in out_fields:
            val = row.get(col, "")
            if col == "amount":
                # Format to two decimals
                val = str(quantize_2dp(amount_dec))
            elif col == "currency":
                val = (val or "").upper()
            # Keep created_at as original string (already validated)
            out_row[col] = val

        writer.writerow(out_row)
        matched_rows += 1

    # Stats to stderr
    sys.stderr.write(
        "Processing summary:\n"
        f"  Total rows (excluding header): {total_rows}\n"
        f"  Matched rows written:         {matched_rows}\n"
        f"  Skipped invalid rows:         {skipped_invalid_rows}\n"
        "  Reasons:\n"
        f"    invalid_amount:             {invalid_amount_count}\n"
        f"    invalid_created_at:         {invalid_created_at_count}\n"
    )

if name == "main": main()

用法示例: python filter_orders.py --input orders.csv --output filtered.csv --min-amount 100 --status paid,shipped --start-date 2024-01-01 --end-date 2024-12-31 --keep-columns order_id,user_id,amount,created_at

补充建议与最佳实践要点:

  • 使用 Decimal 处理金额,避免浮点误差;输出时统一量化到两位小数。
  • 对日期时间解析时,优先使用 fromisoformat,并标准化到“无时区的 UTC”以避免比较错误。
  • 流式处理避免将整文件载入内存;csv.DictReader/DictWriter 是高效选择。
  • 参数校验要清晰地失败并给出可读错误信息;必要时返回非零退出码。
  • 在输出统计信息时,区分“无效数据导致跳过”和“未命中过滤条件”,有利于后续排查数据质量。上述脚本按需求只统计无效数据的跳过原因。

下面给出一个可直接运行的 Python3 脚本 clean_users.py,满足你的全部要求。脚本仅使用标准库,支持流式处理,按 user_id 去重保留 last_login 最新一条,校验邮箱并过滤一次性域名,按输入参数进行筛选,输出 CSV 和同名 .summary.json 概览。

完整实现(保存为 clean_users.py):

#!/usr/bin/env python3

-- coding: utf-8 --

""" clean_users.py 按要求清洗与去重用户 CSV(UTF-8,带表头)。

  • 参数: --input: 输入 CSV 文件路径 --output: 输出 CSV 文件路径 --roles: 逗号分隔,仅保留指定角色(可选) --before: 指定日期(YYYY-MM-DD),仅保留 last_login 早于该日期的用户(必填) --min-score: 最小活动分数,仅保留 activity_score >= 指定值(可选,默认 0)

处理逻辑(关键点):

  • 邮箱校验:正则 + 一次性域名黑名单(脚本内置小型 TXT),非法行丢弃计数。
  • 去重:按 user_id 保留 last_login 最新的一条;对被替换的旧条目计入 "deduplicated_older"。
  • 筛选:在去重后应用 role、before、min-score 的最终过滤。
  • 新增列 inactive_days = (--before 日期 与 last_login 的天数差)。
  • 输出:保持原列顺序,CSV 带表头;另写出同名 .summary.json 包含统计信息。
  • 无第三方库,使用 csv/argparse/datetime/json 等标准库,流式读取。

注意:

  • 仅对通过基础校验(邮箱/一次性域名/last_login 可解析/score 可解析)的行进行去重比较。
  • 角色筛选、日期筛选、分数筛选在去重后应用,以确保“保留 last_login 最新”的语义不受先期筛选影响。 """

import argparse import csv import datetime import json import os import re import sys from collections import Counter

简单邮箱正则(RFC 简化版本)

EMAIL_REGEX = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}$")

内置一次性邮箱域名黑名单(小型 TXT)

DISPOSABLE_DOMAINS_TXT = """

common disposable domains (small list)

mailinator.com 10minutemail.com temp-mail.org guerrillamail.com yopmail.com trashmail.com getnada.com dispostable.com tempmail.io moakt.com maildrop.cc mytemp.email fakeinbox.com """ # 可按需扩展

REQUIRED_FIELDS = ["user_id", "email", "role", "last_login", "activity_score"]

def load_blacklist(txt: str): bl = set() for line in txt.splitlines(): s = line.strip().lower() if not s or s.startswith("#"): continue bl.add(s) return bl

def validate_email(email: str, blacklist: set): """ 返回 (is_valid, reason_or_None) 区分 invalid_email 与 disposable_domain 两类原因 """ if email is None: return False, "invalid_email" email = email.strip() if not EMAIL_REGEX.fullmatch(email): return False, "invalid_email" try: domain = email.rsplit("@", 1)[1].lower() except Exception: return False, "invalid_email" # 直接匹配或后缀匹配(如 sub.mailinator.com) if domain in blacklist or any(domain.endswith("." + b) for b in blacklist): return False, "disposable_domain" # 简单防御:禁止连续点 if ".." in domain: return False, "invalid_email" return True, None

def parse_last_login(s: str): """ 尝试解析 last_login 为 datetime;支持 ISO 格式并兼容常见格式。 返回 datetime 或 None """ if s is None: return None s = s.strip() if not s: return None s2 = s.replace("Z", "+00:00") try: return datetime.datetime.fromisoformat(s2) except ValueError: pass for fmt in ( "%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%Y/%m/%d %H:%M:%S", ): try: return datetime.datetime.strptime(s, fmt) except ValueError: continue return None

def parse_before_date(s: str): try: return datetime.date.fromisoformat(s) except Exception: raise ValueError("Invalid --before date format, expected YYYY-MM-DD")

def main(): parser = argparse.ArgumentParser(description="Clean and deduplicate users CSV.") parser.add_argument("--input", required=True, help="Input CSV (UTF-8, with header)") parser.add_argument("--output", required=True, help="Output CSV path") parser.add_argument( "--roles", help="Comma-separated role list to keep (case-sensitive). Example: dev,qa", ) parser.add_argument( "--before", required=True, help="Keep users whose last_login is before this date (YYYY-MM-DD)", ) parser.add_argument( "--min-score", type=float, default=0.0, help="Keep users with activity_score >= this value (default: 0)", ) args = parser.parse_args()

try:
    before_date = parse_before_date(args.before)
except ValueError as e:
    print(str(e), file=sys.stderr)
    sys.exit(1)

roles_set = None
if args.roles:
    roles_set = {r.strip() for r in args.roles.split(",") if r.strip()}

blacklist = load_blacklist(DISPOSABLE_DOMAINS_TXT)
counters = Counter()

best_by_user = {}  # user_id -> {row, last_login_dt, score, pos}

# 读取与基础校验 + 去重(按最新 last_login)
try:
    with open(args.input, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        if reader.fieldnames is None:
            print("Input CSV missing header.", file=sys.stderr)
            sys.exit(1)
        # 检查必需字段
        missing = [fld for fld in REQUIRED_FIELDS if fld not in reader.fieldnames]
        if missing:
            print(
                "Missing required fields in CSV header: {}".format(", ".join(missing)),
                file=sys.stderr,
            )
            sys.exit(1)

        fieldnames = list(reader.fieldnames)  # 保持原列顺序
        out_fieldnames = fieldnames + ["inactive_days"]

        for idx, row in enumerate(reader, 1):
            counters["total_rows"] += 1

            # 规范化必要字段的空白
            for key in REQUIRED_FIELDS:
                if key in row and row[key] is not None:
                    row[key] = row[key].strip()

            user_id = row["user_id"]

            # 邮箱校验 + 黑名单域名
            ok, reason = validate_email(row.get("email"), blacklist)
            if not ok:
                counters[reason] += 1
                continue

            # last_login 解析
            last_dt = parse_last_login(row.get("last_login"))
            if last_dt is None:
                counters["invalid_last_login_format"] += 1
                continue

            # activity_score 解析为数值(不在此处应用 min-score)
            try:
                score_val = float(row.get("activity_score", ""))
            except (TypeError, ValueError):
                counters["invalid_activity_score"] += 1
                continue

            prev = best_by_user.get(user_id)
            if prev is None:
                best_by_user[user_id] = {
                    "row": row,
                    "last_login_dt": last_dt,
                    "score": score_val,
                    "pos": idx,
                }
            else:
                # 保留 last_login 最新的一条
                if last_dt > prev["last_login_dt"]:
                    # 被替换的旧条目记为去重丢弃
                    counters["deduplicated_older"] += 1
                    best_by_user[user_id] = {
                        "row": row,
                        "last_login_dt": last_dt,
                        "score": score_val,
                        "pos": idx,
                    }
                else:
                    counters["deduplicated_older"] += 1

except FileNotFoundError:
    print(f"Input file not found: {args.input}", file=sys.stderr)
    sys.exit(1)

# 写出结果(应用最终筛选:roles、min-score、before)
rows_kept = 0
out_dir = os.path.dirname(os.path.abspath(args.output)) or "."
os.makedirs(out_dir, exist_ok=True)

try:
    with open(args.output, "w", encoding="utf-8", newline="") as out_f:
        writer = csv.DictWriter(out_f, fieldnames=out_fieldnames)
        writer.writeheader()

        # 按 pos 排序,近似保持首次出现顺序(取最新记录的读取位置)
        for item in sorted(best_by_user.values(), key=lambda x: x["pos"]):
            row = item["row"]

            if roles_set is not None and row.get("role") not in roles_set:
                counters["role_excluded"] += 1
                continue

            # 分数筛选
            if item["score"] < args.min_score:
                counters["score_too_low"] += 1
                continue

            # 日期筛选:仅保留 last_login 早于 before_date 的用户
            last_login_date = item["last_login_dt"].date()
            if not (last_login_date < before_date):
                counters["last_login_not_before"] += 1
                continue

            inactive_days = (before_date - last_login_date).days
            # 输出行:保持原列顺序,新增 inactive_days
            out_row = {k: row.get(k, "") for k in out_fieldnames}
            out_row["inactive_days"] = inactive_days
            writer.writerow(out_row)
            rows_kept += 1
except OSError as e:
    print(f"Failed to write output: {e}", file=sys.stderr)
    sys.exit(1)

# 写 summary JSON(与输出同目录同名基,扩展名为 .summary.json)
summary_path = os.path.splitext(os.path.abspath(args.output))[0] + ".summary.json"
summary = {
    "input": os.path.abspath(args.input),
    "output": os.path.abspath(args.output),
    "total_rows": counters.get("total_rows", 0),
    "rows_kept": rows_kept,
    "discard_reasons": {
        k: v
        for k, v in counters.items()
        if k
        in {
            "invalid_email",
            "disposable_domain",
            "invalid_last_login_format",
            "invalid_activity_score",
            "deduplicated_older",
            "role_excluded",
            "last_login_not_before",
            "score_too_low",
        }
    },
    "filters": {
        "roles": sorted(list(roles_set)) if roles_set is not None else None,
        "before": before_date.isoformat(),
        "min_score": args.min_score,
    },
}
try:
    with open(summary_path, "w", encoding="utf-8") as jf:
        json.dump(summary, jf, ensure_ascii=False, indent=2)
except OSError as e:
    print(f"Failed to write summary: {e}", file=sys.stderr)
    sys.exit(1)

if name == "main": main()

参数说明:

  • --input: 输入 CSV 文件路径(UTF-8,带表头),至少包含字段 user_id,email,role,last_login,activity_score。
  • --output: 输出 CSV 文件路径。会在同目录生成同名 .summary.json 概览文件。
  • --roles: 逗号分隔的角色列表,仅保留这些角色(不传则不按角色过滤)。
  • --before: 必填,格式 YYYY-MM-DD。仅保留 last_login 早于该日期的用户,并用于计算 inactive_days。
  • --min-score: 最小活动分数阈值,保留 activity_score >= 该值。默认 0。

丢弃原因统计(summary.json 中的 discard_reasons):

  • invalid_email: 邮箱格式非法。
  • disposable_domain: 邮箱域名命中一次性邮箱黑名单。
  • invalid_last_login_format: last_login 无法解析为日期/时间。
  • invalid_activity_score: activity_score 非数值。
  • deduplicated_older: 去重时,较旧的重复记录被丢弃。
  • role_excluded: 角色不在 --roles 指定集合内。
  • last_login_not_before: last_login 未早于 --before 指定日期。
  • score_too_low: activity_score 小于 --min-score。

示例命令: python clean_users.py --input users.csv --output users_clean.csv --roles dev,qa --before 2024-10-01 --min-score 60

附加建议(编码最佳实践简述):

  • 保持输入输出的显式编码与 newline 设置,避免平台差异导致的 CSV 兼容问题。
  • 将正则与黑名单常量集中定义,便于维护与扩展。
  • 对时间解析尽量兼容常见格式,但在 summary 中清晰记录筛选条件与结果统计,便于审计与调试。
  • 去重与过滤分阶段:先完成基础合法性校验与“保留最新”原则,再应用业务筛选条件,确保语义明确一致。

下面是满足你要求的 Python3 脚本 log_filter.py。它支持自动识别 gzip 压缩、CSV/TSV 流式逐行过滤、时间窗口、级别、服务、正则匹配、可选列导出、统计到 stderr,以及 --stats-only 模式。未使用第三方库。

保存为 log_filter.py 并执行你的示例命令即可。

#!/usr/bin/env python3

-- coding: utf-8 --

import argparse import csv import gzip import io import os import re import sys from datetime import datetime, timezone, timedelta

REQUIRED_COLUMNS = ["ts", "level", "service", "trace_id", "message"]

def detect_gzip(path): try: with open(path, "rb") as f: sig = f.read(2) return sig == b"\x1f\x8b" except Exception: return False

def open_input(path, is_gzip, encoding="utf-8"): if is_gzip: return gzip.open(path, "rt", encoding=encoding, newline="") else: return open(path, "r", encoding=encoding, newline="")

def open_output(path, encoding="utf-8"): return open(path, "w", encoding=encoding, newline="")

ISO8601_RE = re.compile( r"^(\d{4})-(\d{2})-(\d{2})" r"T(\d{2}):(\d{2}):(\d{2})" r"(?:.(\d{1,6}))?" r"(Z|[+-]\d{2}:\d{2})?$" )

def parse_iso8601_to_epoch(s): # Parse common ISO8601 forms: YYYY-MM-DDTHH:MM:SS[.ffffff][Z|±HH:MM] # Assumes naive timestamps are UTC. if not s: return None m = ISO8601_RE.match(s.strip()) if not m: return None year, mon, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) hour, minute, sec = int(m.group(4)), int(m.group(5)), int(m.group(6)) frac = m.group(7) tz = m.group(8)

micro = 0
if frac:
    # Pad to 6 digits
    micro = int(frac.ljust(6, "0"))

tzinfo = timezone.utc
if tz and tz != "Z":
    sign = 1 if tz[0] == "+" else -1
    tzh = int(tz[1:3])
    tzm = int(tz[4:6])
    tzinfo = timezone(sign * timedelta(hours=tzh, minutes=tzm))

try:
    dt = datetime(year, mon, day, hour, minute, sec, microsecond=micro, tzinfo=tzinfo)
    # Convert to epoch seconds
    return dt.timestamp()
except Exception:
    return None

def parse_levels(values): # Support multiple values via nargs and/or comma-separated if not values: return None out = [] for v in values: if v is None: continue parts = [p.strip() for p in v.split(",") if p.strip()] out.extend(parts) out = [x.upper() for x in out] return set(out) if out else None

def build_arg_parser(): epilog = ( "示例用法:\n" " 1) 基本过滤并导出:\n" " python log_filter.py --input app.csv.gz --output err.csv --levels ERROR --regex "timeout|failed" \\n" " --start 2025-01-01T00:00:00 --end 2025-01-31T23:59:59 --select-columns ts,level,service,message\n" " 2) 过滤 INFO 和 WARN 且服务为 api:\n" " python log_filter.py --input logs.tsv --format tsv --output filtered.tsv --levels INFO,WARN --service api\n" " 3) 仅查看统计信息(不导出文件):\n" " python log_filter.py --input app.csv.gz --levels ERROR --stats-only\n" "说明:\n" " - 自动识别输入是否为 gzip,无需根据扩展名指定。\n" " - 时间戳必须为 ISO8601 格式,如 2025-01-01T00:00:00 或带 Z/±HH:MM 时区;不带时区默认按 UTC 解析。\n" " - --select-columns 将仅导出指定列,列顺序按输入文件表头中的原始顺序保留。\n" " - 行时间无法解析或缺失关键列(ts,level,service,trace_id,message)将计入丢弃统计。\n" ) p = argparse.ArgumentParser( prog="log_filter.py", description="高效日志筛选:流式处理 CSV/TSV 日志,支持级别、正则、时间窗与服务过滤,统计输出到 stderr。", epilog=epilog, formatter_class=argparse.RawTextHelpFormatter, ) p.add_argument("--input", required=True, help="输入日志文件路径(支持 gzip 压缩)") p.add_argument("--output", help="输出文件路径;当 --stats-only 时可省略") p.add_argument("--format", choices=["csv", "tsv"], default="csv", help="输入/输出分隔符格式,默认 csv") p.add_argument("--levels", nargs="+", help="过滤的级别(多值),例如: --levels INFO ERROR 或 --levels INFO,ERROR") p.add_argument("--regex", help="对 message 字段的正则匹配(使用 re.search)") p.add_argument("--start", help="起始时间(ISO8601),如 2025-01-01T00:00:00") p.add_argument("--end", help="结束时间(ISO8601),如 2025-01-31T23:59:59") p.add_argument("--service", help="服务名精确匹配") p.add_argument("--select-columns", help="仅导出的列(逗号分隔),例如: ts,level,service,message") p.add_argument("--stats-only", action="store_true", help="仅打印统计信息到 stderr,不生成输出文件") return p

def process_file(args): # Detect gzip is_gzip = detect_gzip(args.input)

# Validate output path unless stats-only
if not args.stats_only and not args.output:
    print("错误: 未提供 --output 且未启用 --stats-only", file=sys.stderr)
    sys.exit(2)

# Prepare filters
level_set = parse_levels(args.levels)
regex = re.compile(args.regex) if args.regex else None

start_epoch = parse_iso8601_to_epoch(args.start) if args.start else None
if args.start and start_epoch is None:
    print(f"错误: 无法解析 --start 时间: {args.start}", file=sys.stderr)
    sys.exit(2)

end_epoch = parse_iso8601_to_epoch(args.end) if args.end else None
if args.end and end_epoch is None:
    print(f"错误: 无法解析 --end 时间: {args.end}", file=sys.stderr)
    sys.exit(2)

# Prepare reader and writer
delimiter = "," if args.format == "csv" else "\t"

try:
    fin = open_input(args.input, is_gzip)
except Exception as e:
    print(f"错误: 无法打开输入文件: {e}", file=sys.stderr)
    sys.exit(1)

fout = None
writer = None
if not args.stats_only:
    try:
        fout = open_output(args.output)
        writer = csv.writer(fout, delimiter=delimiter, lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
    except Exception as e:
        print(f"错误: 无法打开输出文件: {e}", file=sys.stderr)
        fin.close()
        sys.exit(1)

reader = csv.reader(fin, delimiter=delimiter)
# Read header
try:
    header = next(reader)
except StopIteration:
    print("错误: 输入文件为空", file=sys.stderr)
    fin.close()
    if fout:
        fout.close()
    sys.exit(1)
except Exception as e:
    print(f"错误: 读取表头失败: {e}", file=sys.stderr)
    fin.close()
    if fout:
        fout.close()
    sys.exit(1)

# Map required columns
col_index = {name: None for name in REQUIRED_COLUMNS}
name_to_idx = {name: idx for idx, name in enumerate(header)}

missing_in_header = [name for name in REQUIRED_COLUMNS if name not in name_to_idx]
if missing_in_header:
    print(f"错误: 表头缺失关键列: {', '.join(missing_in_header)}", file=sys.stderr)
    fin.close()
    if fout:
        fout.close()
    sys.exit(1)

for name in REQUIRED_COLUMNS:
    col_index[name] = name_to_idx[name]

# Prepare selected columns (preserving original header order)
if args.select_columns:
    selected = [c.strip() for c in args.select_columns.split(",") if c.strip()]
    unknown = [c for c in selected if c not in name_to_idx]
    if unknown:
        print(f"错误: --select-columns 中的列在输入表头中不存在: {', '.join(unknown)}", file=sys.stderr)
        fin.close()
        if fout:
            fout.close()
        sys.exit(2)
    selected_set = set(selected)
    out_indices = [i for i, col in enumerate(header) if col in selected_set]
else:
    out_indices = list(range(len(header)))

# Write header to output
if writer:
    writer.writerow([header[i] for i in out_indices])

# Stats
rows_total = 0
rows_kept = 0
rows_discarded = 0
kept_per_level = {}
discard_reasons = {
    "missing_required_columns": 0,
    "bad_timestamp": 0,
    "out_of_window": 0,
    "level_mismatch": 0,
    "service_mismatch": 0,
    "regex_no_match": 0,
}

# Stream rows
for row in reader:
    rows_total += 1

    # Validate required columns presence and not empty
    valid = True
    for name, idx in col_index.items():
        if idx >= len(row) or row[idx] == "":
            valid = False
            break
    if not valid:
        rows_discarded += 1
        discard_reasons["missing_required_columns"] += 1
        continue

    ts_str = row[col_index["ts"]]
    level_val = row[col_index["level"]]
    service_val = row[col_index["service"]]
    message_val = row[col_index["message"]]

    # Parse timestamp
    ts_epoch = parse_iso8601_to_epoch(ts_str)
    if ts_epoch is None:
        rows_discarded += 1
        discard_reasons["bad_timestamp"] += 1
        continue

    # Time window
    if start_epoch is not None and ts_epoch < start_epoch:
        rows_discarded += 1
        discard_reasons["out_of_window"] += 1
        continue
    if end_epoch is not None and ts_epoch > end_epoch:
        rows_discarded += 1
        discard_reasons["out_of_window"] += 1
        continue

    # Level filter
    lvl_up = level_val.upper()
    if level_set is not None and lvl_up not in level_set:
        rows_discarded += 1
        discard_reasons["level_mismatch"] += 1
        continue

    # Service filter
    if args.service is not None and service_val != args.service:
        rows_discarded += 1
        discard_reasons["service_mismatch"] += 1
        continue

    # Regex on message
    if regex is not None and not regex.search(message_val):
        rows_discarded += 1
        discard_reasons["regex_no_match"] += 1
        continue

    # Keep row
    rows_kept += 1
    kept_per_level[lvl_up] = kept_per_level.get(lvl_up, 0) + 1

    if writer:
        try:
            writer.writerow([row[i] for i in out_indices])
        except Exception:
            # If write fails, treat as discard? Better to stop with error.
            print("错误: 写入输出失败", file=sys.stderr)
            fin.close()
            fout.close()
            sys.exit(1)

# Close files
fin.close()
if fout:
    fout.close()

# Print stats to stderr
print("统计信息:", file=sys.stderr)
print(f"- 输入文件: {args.input}", file=sys.stderr)
print(f"- 压缩类型: {'gzip' if is_gzip else 'none'}", file=sys.stderr)
print(f"- 格式: {args.format}", file=sys.stderr)
print(f"- 总行数(不含表头): {rows_total}", file=sys.stderr)
print(f"- 保留行数: {rows_kept}", file=sys.stderr)
print(f"- 丢弃行数: {rows_discarded}", file=sys.stderr)
print("- 丢弃原因统计:", file=sys.stderr)
for k in ["missing_required_columns", "bad_timestamp", "out_of_window", "level_mismatch", "service_mismatch", "regex_no_match"]:
    print(f"  * {k}: {discard_reasons[k]}", file=sys.stderr)
print("- 每级别保留计数:", file=sys.stderr)
if kept_per_level:
    for lvl in sorted(kept_per_level.keys()):
        print(f"  * {lvl}: {kept_per_level[lvl]}", file=sys.stderr)
else:
    print("  * (无)", file=sys.stderr)

def main(): parser = build_arg_parser() args = parser.parse_args() process_file(args)

if name == "main": main()

补充说明与实践建议:

  • 时间解析假设无时区的时间戳为 UTC。若你的日志为本地时区,请在生成时统一为 UTC 或在 ts 字段包含时区偏移。
  • 正则为默认大小写敏感;如需忽略大小写,可在传入的模式中使用 (?i) 前缀,例如 --regex "(?i)timeout|failed"。
  • 流式处理与 csv 模块能保持内存占用低;避免将整文件读入内存。
  • 对于大文件,建议使用 gzip 压缩输入以减少 I/O 开销,并使用选择列导出降低输出体积。

示例详情

解决的问题

为开发者提供高效的编程支持和指导,解决日常编程中的痛点问题,并帮助他们优化代码质量、加速调试流程,选择合适的开发工具和技术方案。

适用用户

编程爱好者

快速上手多种编程语言,获得实时指导和代码优化建议,轻松完成小型项目或单功能实现。

职场开发者

提升编码效率与代码质量,高效解决工作中遇到的技术问题并优化当前项目开发流程。

项目经理与技术决策者

找到适合团队或项目目标的工具、库与框架,推动团队实施更高效的开发方案。

特征总结

一站式编程指导,为多种语言(如 Python、JavaScript、Java 和 C++)提供深度支持,助您快速上手或提升技能。
智能优化代码结构,生成简洁高效的代码,帮助您提升开发效率,解决复杂场景下的编码难题。
自动化问题排查功能,迅速辨识代码中的错误或漏洞,提供精准的调试和修复方案,节省排查时间。
针对具体项目需求,推荐最优工具、库与框架,减少选择成本并确保项目实施的高效性和可靠性。
多领域编程提示支持,从算法设计到数据结构,以及设计模式讲解,轻松理解编程核心概念。
实时共享编程最佳实践,提供高质量编程建议以及精美文档书写范例,打造专业级代码交付。
灵活适配多样场景需求,无论是快速脚本编写还是复杂系统规划,都能高效满足。
支持入门和进阶用户,结合具体场景定制化指导,提高编程技能的同时更好完成项目。
提供跨语言的技巧迁移,对不同编程语言的相通点展开详细解读,加深对语言逻辑的掌握。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥0元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 107 tokens
- 1 个可调节参数
{ 编程任务请求 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59