热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
提供编程指导与最佳实践,辅助解决常见问题
下面是一个可直接运行的 Python3 CLI 脚本 filter_orders.py,满足你的需求。将其保存为 filter_orders.py 后即可使用示例命令执行。
#!/usr/bin/env python3 """ Filter orders CSV by status, min amount, and date range.
Reads a UTF-8 CSV (with header) containing fields: order_id, user_id, amount, status, currency, created_at created_at may be ISO8601 or "YYYY-MM-DD HH:MM:SS".
Features:
Usage example: python filter_orders.py --input orders.csv --output filtered.csv --min-amount 100 --status paid,shipped --start-date 2024-01-01 --end-date 2024-12-31 --keep-columns order_id,user_id,amount,created_at """
import argparse import csv import sys from decimal import Decimal, InvalidOperation, ROUND_HALF_UP from datetime import datetime, date, time, timezone
REQUIRED_FIELDS = ["order_id", "user_id", "amount", "status", "currency", "created_at"]
def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog="filter_orders.py", description="Filter orders CSV by status, minimum amount, and date range." ) parser.add_argument("--input", required=True, help="Input CSV file path (UTF-8).") parser.add_argument("--output", required=True, help="Output CSV file path (UTF-8).") parser.add_argument("--delimiter", default=",", help="CSV delimiter. Default: ','") parser.add_argument("--status", help="Comma-separated status values to include (case-insensitive).") parser.add_argument("--min-amount", help="Minimum amount (float/decimal) for filtering. Example: 100.0") parser.add_argument("--start-date", help="Start date/time (ISO8601 or 'YYYY-MM-DD HH:MM:SS'). Inclusive.") parser.add_argument("--end-date", help="End date/time (ISO8601 or 'YYYY-MM-DD HH:MM:SS'). Inclusive.") parser.add_argument("--keep-columns", help="Comma-separated columns to keep in output. Default: keep all.") return parser.parse_args()
def parse_status_list(s: str): if not s: return None return {item.strip().lower() for item in s.split(",") if item.strip()}
def parse_decimal(s: str) -> Decimal: return Decimal(s)
def quantize_2dp(d: Decimal) -> Decimal: return d.quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
def _fromisoformat_loose(dt_str: str) -> datetime: """ A slightly more forgiving ISO8601 parser: - Supports trailing 'Z' by converting to '+00:00' - Supports date-only strings (returns midnight) - Supports 'YYYY-MM-DD HH:MM:SS' and 'YYYY-MM-DDTHH:MM:SS[±HH:MM]' """ s = dt_str.strip() if not s: raise ValueError("Empty datetime string")
# Handle trailing 'Z' (UTC)
if s.endswith("Z"):
s = s[:-1] + "+00:00"
# Try datetime.fromisoformat directly
try:
dt = datetime.fromisoformat(s)
return dt
except ValueError:
# Try date-only
try:
d = date.fromisoformat(s)
return datetime.combine(d, time())
except ValueError:
# Last resort: try replacing ' ' with 'T'
if " " in s:
try:
dt = datetime.fromisoformat(s.replace(" ", "T"))
return dt
except ValueError:
pass
raise
def normalize_to_naive_utc(dt: datetime) -> datetime: """ Normalize datetime to a naive UTC datetime (tz removed) for consistent comparisons. If dt has tzinfo, convert to UTC and strip tz; otherwise return as-is. """ if dt.tzinfo is not None: return dt.astimezone(timezone.utc).replace(tzinfo=None) return dt
def parse_bound_datetime(s: str, is_end: bool) -> datetime: """ Parse start/end bound. If date-only: - start: set to 00:00:00 - end: set to 23:59:59.999999 Normalize timezone-aware times to naive UTC. """ dt = _fromisoformat_loose(s) # If user passed a pure date, ensure appropriate time component if dt.time() == time() and s.strip()[:10] == s.strip(): # likely pure date if is_end: dt = dt.replace(hour=23, minute=59, second=59, microsecond=999999) else: dt = dt.replace(hour=0, minute=0, second=0, microsecond=0) return normalize_to_naive_utc(dt)
def main(): args = parse_args()
status_set = parse_status_list(args.status)
min_amount_dec = None
if args.min_amount is not None:
try:
# Use Decimal for robust comparison/formatting
min_amount_dec = parse_decimal(args.min_amount)
except InvalidOperation:
sys.stderr.write(f"Error: --min-amount '{args.min_amount}' is not a valid number.\n")
sys.exit(2)
start_dt = None
end_dt = None
try:
if args.start_date:
start_dt = parse_bound_datetime(args.start_date, is_end=False)
if args.end_date:
end_dt = parse_bound_datetime(args.end_date, is_end=True)
except ValueError as e:
sys.stderr.write(f"Error: Invalid date/time bound: {e}\n")
sys.exit(2)
# Open input/output in streaming mode
try:
infile = open(args.input, "r", encoding="utf-8-sig", newline="")
except OSError as e:
sys.stderr.write(f"Error: Cannot open input file: {e}\n")
sys.exit(2)
try:
outfile = open(args.output, "w", encoding="utf-8", newline="")
except OSError as e:
infile.close()
sys.stderr.write(f"Error: Cannot open output file: {e}\n")
sys.exit(2)
with infile, outfile:
reader = csv.DictReader(infile, delimiter=args.delimiter)
if reader.fieldnames is None:
sys.stderr.write("Error: Input CSV appears to have no header.\n")
sys.exit(2)
# Ensure required fields exist
missing_required = [f for f in REQUIRED_FIELDS if f not in reader.fieldnames]
if missing_required:
sys.stderr.write(
"Error: Input CSV missing required columns: "
+ ", ".join(missing_required) + "\n"
)
sys.exit(2)
# Determine output columns
if args.keep_columns:
keep_cols = [c.strip() for c in args.keep_columns.split(",") if c.strip()]
missing_keep = [c for c in keep_cols if c not in reader.fieldnames]
if missing_keep:
sys.stderr.write(
"Error: --keep-columns includes columns not found in input: "
+ ", ".join(missing_keep) + "\n"
)
sys.exit(2)
out_fields = keep_cols
else:
out_fields = reader.fieldnames
writer = csv.DictWriter(outfile, fieldnames=out_fields, delimiter=args.delimiter)
writer.writeheader()
# Stats
total_rows = 0
matched_rows = 0
skipped_invalid_rows = 0
invalid_amount_count = 0
invalid_created_at_count = 0
# Stream rows
for row in reader:
total_rows += 1
# Validate amount
raw_amount = (row.get("amount") or "").strip()
try:
amount_dec = parse_decimal(raw_amount)
except (InvalidOperation, AttributeError):
skipped_invalid_rows += 1
invalid_amount_count += 1
continue
# Validate created_at
raw_created = (row.get("created_at") or "").strip()
try:
created_dt = _fromisoformat_loose(raw_created)
created_dt = normalize_to_naive_utc(created_dt)
except ValueError:
skipped_invalid_rows += 1
invalid_created_at_count += 1
continue
# Apply filters
if status_set is not None:
row_status = (row.get("status") or "").strip().lower()
if row_status not in status_set:
continue
if min_amount_dec is not None and amount_dec < min_amount_dec:
continue
if start_dt is not None and created_dt < start_dt:
continue
if end_dt is not None and created_dt > end_dt:
continue
# Prepare output row with formatting
out_row = {}
for col in out_fields:
val = row.get(col, "")
if col == "amount":
# Format to two decimals
val = str(quantize_2dp(amount_dec))
elif col == "currency":
val = (val or "").upper()
# Keep created_at as original string (already validated)
out_row[col] = val
writer.writerow(out_row)
matched_rows += 1
# Stats to stderr
sys.stderr.write(
"Processing summary:\n"
f" Total rows (excluding header): {total_rows}\n"
f" Matched rows written: {matched_rows}\n"
f" Skipped invalid rows: {skipped_invalid_rows}\n"
" Reasons:\n"
f" invalid_amount: {invalid_amount_count}\n"
f" invalid_created_at: {invalid_created_at_count}\n"
)
if name == "main": main()
用法示例: python filter_orders.py --input orders.csv --output filtered.csv --min-amount 100 --status paid,shipped --start-date 2024-01-01 --end-date 2024-12-31 --keep-columns order_id,user_id,amount,created_at
补充建议与最佳实践要点:
下面给出一个可直接运行的 Python3 脚本 clean_users.py,满足你的全部要求。脚本仅使用标准库,支持流式处理,按 user_id 去重保留 last_login 最新一条,校验邮箱并过滤一次性域名,按输入参数进行筛选,输出 CSV 和同名 .summary.json 概览。
#!/usr/bin/env python3
""" clean_users.py 按要求清洗与去重用户 CSV(UTF-8,带表头)。
处理逻辑(关键点):
注意:
import argparse import csv import datetime import json import os import re import sys from collections import Counter
EMAIL_REGEX = re.compile(r"^[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Za-z]{2,}$")
DISPOSABLE_DOMAINS_TXT = """
mailinator.com 10minutemail.com temp-mail.org guerrillamail.com yopmail.com trashmail.com getnada.com dispostable.com tempmail.io moakt.com maildrop.cc mytemp.email fakeinbox.com """ # 可按需扩展
REQUIRED_FIELDS = ["user_id", "email", "role", "last_login", "activity_score"]
def load_blacklist(txt: str): bl = set() for line in txt.splitlines(): s = line.strip().lower() if not s or s.startswith("#"): continue bl.add(s) return bl
def validate_email(email: str, blacklist: set): """ 返回 (is_valid, reason_or_None) 区分 invalid_email 与 disposable_domain 两类原因 """ if email is None: return False, "invalid_email" email = email.strip() if not EMAIL_REGEX.fullmatch(email): return False, "invalid_email" try: domain = email.rsplit("@", 1)[1].lower() except Exception: return False, "invalid_email" # 直接匹配或后缀匹配(如 sub.mailinator.com) if domain in blacklist or any(domain.endswith("." + b) for b in blacklist): return False, "disposable_domain" # 简单防御:禁止连续点 if ".." in domain: return False, "invalid_email" return True, None
def parse_last_login(s: str): """ 尝试解析 last_login 为 datetime;支持 ISO 格式并兼容常见格式。 返回 datetime 或 None """ if s is None: return None s = s.strip() if not s: return None s2 = s.replace("Z", "+00:00") try: return datetime.datetime.fromisoformat(s2) except ValueError: pass for fmt in ( "%Y-%m-%d", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d", "%Y/%m/%d %H:%M:%S", ): try: return datetime.datetime.strptime(s, fmt) except ValueError: continue return None
def parse_before_date(s: str): try: return datetime.date.fromisoformat(s) except Exception: raise ValueError("Invalid --before date format, expected YYYY-MM-DD")
def main(): parser = argparse.ArgumentParser(description="Clean and deduplicate users CSV.") parser.add_argument("--input", required=True, help="Input CSV (UTF-8, with header)") parser.add_argument("--output", required=True, help="Output CSV path") parser.add_argument( "--roles", help="Comma-separated role list to keep (case-sensitive). Example: dev,qa", ) parser.add_argument( "--before", required=True, help="Keep users whose last_login is before this date (YYYY-MM-DD)", ) parser.add_argument( "--min-score", type=float, default=0.0, help="Keep users with activity_score >= this value (default: 0)", ) args = parser.parse_args()
try:
before_date = parse_before_date(args.before)
except ValueError as e:
print(str(e), file=sys.stderr)
sys.exit(1)
roles_set = None
if args.roles:
roles_set = {r.strip() for r in args.roles.split(",") if r.strip()}
blacklist = load_blacklist(DISPOSABLE_DOMAINS_TXT)
counters = Counter()
best_by_user = {} # user_id -> {row, last_login_dt, score, pos}
# 读取与基础校验 + 去重(按最新 last_login)
try:
with open(args.input, "r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
if reader.fieldnames is None:
print("Input CSV missing header.", file=sys.stderr)
sys.exit(1)
# 检查必需字段
missing = [fld for fld in REQUIRED_FIELDS if fld not in reader.fieldnames]
if missing:
print(
"Missing required fields in CSV header: {}".format(", ".join(missing)),
file=sys.stderr,
)
sys.exit(1)
fieldnames = list(reader.fieldnames) # 保持原列顺序
out_fieldnames = fieldnames + ["inactive_days"]
for idx, row in enumerate(reader, 1):
counters["total_rows"] += 1
# 规范化必要字段的空白
for key in REQUIRED_FIELDS:
if key in row and row[key] is not None:
row[key] = row[key].strip()
user_id = row["user_id"]
# 邮箱校验 + 黑名单域名
ok, reason = validate_email(row.get("email"), blacklist)
if not ok:
counters[reason] += 1
continue
# last_login 解析
last_dt = parse_last_login(row.get("last_login"))
if last_dt is None:
counters["invalid_last_login_format"] += 1
continue
# activity_score 解析为数值(不在此处应用 min-score)
try:
score_val = float(row.get("activity_score", ""))
except (TypeError, ValueError):
counters["invalid_activity_score"] += 1
continue
prev = best_by_user.get(user_id)
if prev is None:
best_by_user[user_id] = {
"row": row,
"last_login_dt": last_dt,
"score": score_val,
"pos": idx,
}
else:
# 保留 last_login 最新的一条
if last_dt > prev["last_login_dt"]:
# 被替换的旧条目记为去重丢弃
counters["deduplicated_older"] += 1
best_by_user[user_id] = {
"row": row,
"last_login_dt": last_dt,
"score": score_val,
"pos": idx,
}
else:
counters["deduplicated_older"] += 1
except FileNotFoundError:
print(f"Input file not found: {args.input}", file=sys.stderr)
sys.exit(1)
# 写出结果(应用最终筛选:roles、min-score、before)
rows_kept = 0
out_dir = os.path.dirname(os.path.abspath(args.output)) or "."
os.makedirs(out_dir, exist_ok=True)
try:
with open(args.output, "w", encoding="utf-8", newline="") as out_f:
writer = csv.DictWriter(out_f, fieldnames=out_fieldnames)
writer.writeheader()
# 按 pos 排序,近似保持首次出现顺序(取最新记录的读取位置)
for item in sorted(best_by_user.values(), key=lambda x: x["pos"]):
row = item["row"]
if roles_set is not None and row.get("role") not in roles_set:
counters["role_excluded"] += 1
continue
# 分数筛选
if item["score"] < args.min_score:
counters["score_too_low"] += 1
continue
# 日期筛选:仅保留 last_login 早于 before_date 的用户
last_login_date = item["last_login_dt"].date()
if not (last_login_date < before_date):
counters["last_login_not_before"] += 1
continue
inactive_days = (before_date - last_login_date).days
# 输出行:保持原列顺序,新增 inactive_days
out_row = {k: row.get(k, "") for k in out_fieldnames}
out_row["inactive_days"] = inactive_days
writer.writerow(out_row)
rows_kept += 1
except OSError as e:
print(f"Failed to write output: {e}", file=sys.stderr)
sys.exit(1)
# 写 summary JSON(与输出同目录同名基,扩展名为 .summary.json)
summary_path = os.path.splitext(os.path.abspath(args.output))[0] + ".summary.json"
summary = {
"input": os.path.abspath(args.input),
"output": os.path.abspath(args.output),
"total_rows": counters.get("total_rows", 0),
"rows_kept": rows_kept,
"discard_reasons": {
k: v
for k, v in counters.items()
if k
in {
"invalid_email",
"disposable_domain",
"invalid_last_login_format",
"invalid_activity_score",
"deduplicated_older",
"role_excluded",
"last_login_not_before",
"score_too_low",
}
},
"filters": {
"roles": sorted(list(roles_set)) if roles_set is not None else None,
"before": before_date.isoformat(),
"min_score": args.min_score,
},
}
try:
with open(summary_path, "w", encoding="utf-8") as jf:
json.dump(summary, jf, ensure_ascii=False, indent=2)
except OSError as e:
print(f"Failed to write summary: {e}", file=sys.stderr)
sys.exit(1)
参数说明:
丢弃原因统计(summary.json 中的 discard_reasons):
示例命令: python clean_users.py --input users.csv --output users_clean.csv --roles dev,qa --before 2024-10-01 --min-score 60
附加建议(编码最佳实践简述):
下面是满足你要求的 Python3 脚本 log_filter.py。它支持自动识别 gzip 压缩、CSV/TSV 流式逐行过滤、时间窗口、级别、服务、正则匹配、可选列导出、统计到 stderr,以及 --stats-only 模式。未使用第三方库。
保存为 log_filter.py 并执行你的示例命令即可。
#!/usr/bin/env python3
import argparse import csv import gzip import io import os import re import sys from datetime import datetime, timezone, timedelta
REQUIRED_COLUMNS = ["ts", "level", "service", "trace_id", "message"]
def detect_gzip(path): try: with open(path, "rb") as f: sig = f.read(2) return sig == b"\x1f\x8b" except Exception: return False
def open_input(path, is_gzip, encoding="utf-8"): if is_gzip: return gzip.open(path, "rt", encoding=encoding, newline="") else: return open(path, "r", encoding=encoding, newline="")
def open_output(path, encoding="utf-8"): return open(path, "w", encoding=encoding, newline="")
ISO8601_RE = re.compile( r"^(\d{4})-(\d{2})-(\d{2})" r"T(\d{2}):(\d{2}):(\d{2})" r"(?:.(\d{1,6}))?" r"(Z|[+-]\d{2}:\d{2})?$" )
def parse_iso8601_to_epoch(s): # Parse common ISO8601 forms: YYYY-MM-DDTHH:MM:SS[.ffffff][Z|±HH:MM] # Assumes naive timestamps are UTC. if not s: return None m = ISO8601_RE.match(s.strip()) if not m: return None year, mon, day = int(m.group(1)), int(m.group(2)), int(m.group(3)) hour, minute, sec = int(m.group(4)), int(m.group(5)), int(m.group(6)) frac = m.group(7) tz = m.group(8)
micro = 0
if frac:
# Pad to 6 digits
micro = int(frac.ljust(6, "0"))
tzinfo = timezone.utc
if tz and tz != "Z":
sign = 1 if tz[0] == "+" else -1
tzh = int(tz[1:3])
tzm = int(tz[4:6])
tzinfo = timezone(sign * timedelta(hours=tzh, minutes=tzm))
try:
dt = datetime(year, mon, day, hour, minute, sec, microsecond=micro, tzinfo=tzinfo)
# Convert to epoch seconds
return dt.timestamp()
except Exception:
return None
def parse_levels(values): # Support multiple values via nargs and/or comma-separated if not values: return None out = [] for v in values: if v is None: continue parts = [p.strip() for p in v.split(",") if p.strip()] out.extend(parts) out = [x.upper() for x in out] return set(out) if out else None
def build_arg_parser(): epilog = ( "示例用法:\n" " 1) 基本过滤并导出:\n" " python log_filter.py --input app.csv.gz --output err.csv --levels ERROR --regex "timeout|failed" \\n" " --start 2025-01-01T00:00:00 --end 2025-01-31T23:59:59 --select-columns ts,level,service,message\n" " 2) 过滤 INFO 和 WARN 且服务为 api:\n" " python log_filter.py --input logs.tsv --format tsv --output filtered.tsv --levels INFO,WARN --service api\n" " 3) 仅查看统计信息(不导出文件):\n" " python log_filter.py --input app.csv.gz --levels ERROR --stats-only\n" "说明:\n" " - 自动识别输入是否为 gzip,无需根据扩展名指定。\n" " - 时间戳必须为 ISO8601 格式,如 2025-01-01T00:00:00 或带 Z/±HH:MM 时区;不带时区默认按 UTC 解析。\n" " - --select-columns 将仅导出指定列,列顺序按输入文件表头中的原始顺序保留。\n" " - 行时间无法解析或缺失关键列(ts,level,service,trace_id,message)将计入丢弃统计。\n" ) p = argparse.ArgumentParser( prog="log_filter.py", description="高效日志筛选:流式处理 CSV/TSV 日志,支持级别、正则、时间窗与服务过滤,统计输出到 stderr。", epilog=epilog, formatter_class=argparse.RawTextHelpFormatter, ) p.add_argument("--input", required=True, help="输入日志文件路径(支持 gzip 压缩)") p.add_argument("--output", help="输出文件路径;当 --stats-only 时可省略") p.add_argument("--format", choices=["csv", "tsv"], default="csv", help="输入/输出分隔符格式,默认 csv") p.add_argument("--levels", nargs="+", help="过滤的级别(多值),例如: --levels INFO ERROR 或 --levels INFO,ERROR") p.add_argument("--regex", help="对 message 字段的正则匹配(使用 re.search)") p.add_argument("--start", help="起始时间(ISO8601),如 2025-01-01T00:00:00") p.add_argument("--end", help="结束时间(ISO8601),如 2025-01-31T23:59:59") p.add_argument("--service", help="服务名精确匹配") p.add_argument("--select-columns", help="仅导出的列(逗号分隔),例如: ts,level,service,message") p.add_argument("--stats-only", action="store_true", help="仅打印统计信息到 stderr,不生成输出文件") return p
def process_file(args): # Detect gzip is_gzip = detect_gzip(args.input)
# Validate output path unless stats-only
if not args.stats_only and not args.output:
print("错误: 未提供 --output 且未启用 --stats-only", file=sys.stderr)
sys.exit(2)
# Prepare filters
level_set = parse_levels(args.levels)
regex = re.compile(args.regex) if args.regex else None
start_epoch = parse_iso8601_to_epoch(args.start) if args.start else None
if args.start and start_epoch is None:
print(f"错误: 无法解析 --start 时间: {args.start}", file=sys.stderr)
sys.exit(2)
end_epoch = parse_iso8601_to_epoch(args.end) if args.end else None
if args.end and end_epoch is None:
print(f"错误: 无法解析 --end 时间: {args.end}", file=sys.stderr)
sys.exit(2)
# Prepare reader and writer
delimiter = "," if args.format == "csv" else "\t"
try:
fin = open_input(args.input, is_gzip)
except Exception as e:
print(f"错误: 无法打开输入文件: {e}", file=sys.stderr)
sys.exit(1)
fout = None
writer = None
if not args.stats_only:
try:
fout = open_output(args.output)
writer = csv.writer(fout, delimiter=delimiter, lineterminator="\n", quoting=csv.QUOTE_MINIMAL)
except Exception as e:
print(f"错误: 无法打开输出文件: {e}", file=sys.stderr)
fin.close()
sys.exit(1)
reader = csv.reader(fin, delimiter=delimiter)
# Read header
try:
header = next(reader)
except StopIteration:
print("错误: 输入文件为空", file=sys.stderr)
fin.close()
if fout:
fout.close()
sys.exit(1)
except Exception as e:
print(f"错误: 读取表头失败: {e}", file=sys.stderr)
fin.close()
if fout:
fout.close()
sys.exit(1)
# Map required columns
col_index = {name: None for name in REQUIRED_COLUMNS}
name_to_idx = {name: idx for idx, name in enumerate(header)}
missing_in_header = [name for name in REQUIRED_COLUMNS if name not in name_to_idx]
if missing_in_header:
print(f"错误: 表头缺失关键列: {', '.join(missing_in_header)}", file=sys.stderr)
fin.close()
if fout:
fout.close()
sys.exit(1)
for name in REQUIRED_COLUMNS:
col_index[name] = name_to_idx[name]
# Prepare selected columns (preserving original header order)
if args.select_columns:
selected = [c.strip() for c in args.select_columns.split(",") if c.strip()]
unknown = [c for c in selected if c not in name_to_idx]
if unknown:
print(f"错误: --select-columns 中的列在输入表头中不存在: {', '.join(unknown)}", file=sys.stderr)
fin.close()
if fout:
fout.close()
sys.exit(2)
selected_set = set(selected)
out_indices = [i for i, col in enumerate(header) if col in selected_set]
else:
out_indices = list(range(len(header)))
# Write header to output
if writer:
writer.writerow([header[i] for i in out_indices])
# Stats
rows_total = 0
rows_kept = 0
rows_discarded = 0
kept_per_level = {}
discard_reasons = {
"missing_required_columns": 0,
"bad_timestamp": 0,
"out_of_window": 0,
"level_mismatch": 0,
"service_mismatch": 0,
"regex_no_match": 0,
}
# Stream rows
for row in reader:
rows_total += 1
# Validate required columns presence and not empty
valid = True
for name, idx in col_index.items():
if idx >= len(row) or row[idx] == "":
valid = False
break
if not valid:
rows_discarded += 1
discard_reasons["missing_required_columns"] += 1
continue
ts_str = row[col_index["ts"]]
level_val = row[col_index["level"]]
service_val = row[col_index["service"]]
message_val = row[col_index["message"]]
# Parse timestamp
ts_epoch = parse_iso8601_to_epoch(ts_str)
if ts_epoch is None:
rows_discarded += 1
discard_reasons["bad_timestamp"] += 1
continue
# Time window
if start_epoch is not None and ts_epoch < start_epoch:
rows_discarded += 1
discard_reasons["out_of_window"] += 1
continue
if end_epoch is not None and ts_epoch > end_epoch:
rows_discarded += 1
discard_reasons["out_of_window"] += 1
continue
# Level filter
lvl_up = level_val.upper()
if level_set is not None and lvl_up not in level_set:
rows_discarded += 1
discard_reasons["level_mismatch"] += 1
continue
# Service filter
if args.service is not None and service_val != args.service:
rows_discarded += 1
discard_reasons["service_mismatch"] += 1
continue
# Regex on message
if regex is not None and not regex.search(message_val):
rows_discarded += 1
discard_reasons["regex_no_match"] += 1
continue
# Keep row
rows_kept += 1
kept_per_level[lvl_up] = kept_per_level.get(lvl_up, 0) + 1
if writer:
try:
writer.writerow([row[i] for i in out_indices])
except Exception:
# If write fails, treat as discard? Better to stop with error.
print("错误: 写入输出失败", file=sys.stderr)
fin.close()
fout.close()
sys.exit(1)
# Close files
fin.close()
if fout:
fout.close()
# Print stats to stderr
print("统计信息:", file=sys.stderr)
print(f"- 输入文件: {args.input}", file=sys.stderr)
print(f"- 压缩类型: {'gzip' if is_gzip else 'none'}", file=sys.stderr)
print(f"- 格式: {args.format}", file=sys.stderr)
print(f"- 总行数(不含表头): {rows_total}", file=sys.stderr)
print(f"- 保留行数: {rows_kept}", file=sys.stderr)
print(f"- 丢弃行数: {rows_discarded}", file=sys.stderr)
print("- 丢弃原因统计:", file=sys.stderr)
for k in ["missing_required_columns", "bad_timestamp", "out_of_window", "level_mismatch", "service_mismatch", "regex_no_match"]:
print(f" * {k}: {discard_reasons[k]}", file=sys.stderr)
print("- 每级别保留计数:", file=sys.stderr)
if kept_per_level:
for lvl in sorted(kept_per_level.keys()):
print(f" * {lvl}: {kept_per_level[lvl]}", file=sys.stderr)
else:
print(" * (无)", file=sys.stderr)
def main(): parser = build_arg_parser() args = parser.parse_args() process_file(args)
if name == "main": main()
补充说明与实践建议:
为开发者提供高效的编程支持和指导,解决日常编程中的痛点问题,并帮助他们优化代码质量、加速调试流程,选择合适的开发工具和技术方案。
快速上手多种编程语言,获得实时指导和代码优化建议,轻松完成小型项目或单功能实现。
提升编码效率与代码质量,高效解决工作中遇到的技术问题并优化当前项目开发流程。
找到适合团队或项目目标的工具、库与框架,推动团队实施更高效的开发方案。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期