热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
本提示词针对商业数据清洗场景,自动评估CSV数据质量,生成缺失值处理、标准化和异常值处理规则,并输出可执行Python清洗脚本和清洗效果报告,帮助提升数据可靠性并支撑后续分析。
Python自动化清洗脚本 #!/usr/bin/env python3
""" 订单数据清洗与质量评估脚本 功能:
使用方式: python clean_orders.py --input input_orders.csv --output-dir output --base-currency CNY --rate-USD 7.0 若未提供 --input,将自动写入本题示例数据到 ./input_orders.csv 并读取。 """ import os import sys import json import math import argparse import logging import datetime as dt from typing import Dict, Any, List, Tuple
import pandas as pd import numpy as np
EXPECTED_COLUMNS = [ "order_id","order_date","customer_id","region","item","qty", "unit_price","currency","amount","discount","status","note" ]
DEFAULT_BASE_CCY = "CNY" DEFAULT_EXCHANGE_RATES = { "USD": 7.0, # 可按需通过命令行覆盖 "CNY": 1.0, "RMB": 1.0 }
REGION_MAP = { "华东": "华东", "hua dong": "华东", "hua dong": "华东", "hua_dong": "华东", "huadong": "华东", "华東": "华东", "华北": "华北", "华南": "华南", "华中": "华中", "西南": "西南", "西北": "西北", "东北": "东北" } STATUS_MAP = { "已支付": "已支付", "paid": "已支付", "paid ": "已支付", "paid.": "已支付", "paid,": "已支付", "paid/": "已支付", "paid\": "已支付", "paid;": "已支付", "paid:": "已支付", "paid!": "已支付", "paid?": "已支付", "PAID": "已支付", "Paid": "已支付", "已取消": "已取消", "取消": "已取消", "未支付": "未支付", "待支付": "未支付", "pending": "未支付" }
def setup_logging(output_dir:str): os.makedirs(output_dir, exist_ok=True) log_path = os.path.join(output_dir, "cleaning_log.txt") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_path, encoding="utf-8"), logging.StreamHandler(sys.stdout) ] ) logging.info("日志初始化完成 -> %s", log_path) return log_path
def write_sample_if_missing(path:str): if os.path.exists(path): return sample = """order_id,order_date,customer_id,region,item,qty,unit_price,currency,amount,discount,status,note 1001,2024-01-05,CU001,华东,咖啡豆A,2,58,CNY,116,0.05,已支付,首单优惠 1001,2024-01-05,CU001,华东,咖啡豆A,2,58,CNY,116,0.05,已支付,重复提交 1002,1/6/2024,CU002,Hua Dong,咖啡豆A,,58,CNY,,0,已支付,缺失数量与金额 1003,2024/01/07,CU003,华北,奶茶粉B,1000,12,CNY,12000,0.1,已支付,数量异常 1004,2024-01-08,CU004,华东,巧克力C,3,$9.5,USD,28.5,0,PAID,英文状态与币种 1005,2024-01-09,CU005,华南,曲奇D,5,15,CNY,60,0,已支付,金额不匹配(应=75) 1006,2024-13-01,CU006,华中,蛋糕E,1,50,CNY,50,0,已支付,日期错误 1007,2024-01-10,CU007,华东,酸奶F,2,???,CNY,??,0,已支付,文本乱码 1008,2024-01-11,CU008,华东,生鲜组合,1,30,CNY,30,0,已支付,正常记录 1009,2024-01-11,CU009,华东,生鲜,组合,1,30,CNY,30,0,已支付,字段错位 1010,2024-01-12,CU010,Ã¥—æ‘,牛奶G,2,5,CNY,10,0,已支付,编码问题 1011,2024-01-12,CU011,华東,面包H,3,7,CNY,21,0,已支付,地区拼写变体 """ with open(path, "w", encoding="utf-8") as f: f.write(sample)
def read_csv(input_path:str)->pd.DataFrame: df = pd.read_csv(input_path, dtype=str, keep_default_na=False, na_values=["", "NA", "N/A", "null", "Null", "NULL"]) # 去除列名空格与隐藏字符 df.columns = [c.strip() for c in df.columns] # 校验列 missing_cols = [c for c in EXPECTED_COLUMNS if c not in df.columns] if missing_cols: raise ValueError(f"输入缺少必要列: {missing_cols}") # 仅保留所需列的顺序 df = df[EXPECTED_COLUMNS].copy() return df
def safe_strip(x): if isinstance(x, str): return x.strip() return x
def maybe_fix_encoding(s: str) -> str: # 尝试修复常见mojibake:latin1误解码的UTF-8 if not isinstance(s, str) or s == "": return s try: repaired = s.encode("latin1").decode("utf-8") # 简单启发:若中文字符数量增加则采用 def zh_count(txt): return sum(1 for ch in txt if '\u4e00' <= ch <= '\u9fff') if zh_count(repaired) > zh_count(s): return repaired except Exception: pass return s
def normalize_region(s: str) -> Tuple[str,bool]: if not isinstance(s, str): return s, False original = s s1 = maybe_fix_encoding(s).strip() key = s1.replace(" ", "_").lower() normalized = REGION_MAP.get(key, REGION_MAP.get(s1, None)) changed = False if normalized: changed = (normalized != original) return normalized, changed # 若包含已知中文区域词根则保留原值 for cand in ["华东","华南","华北","华中","西南","西北","东北"]: if cand in s1: return cand, (cand != original) # 无法识别则保留原值 return s1, (s1 != original)
def normalize_status(s: str) -> Tuple[str,bool]: if not isinstance(s, str): return s, False original = s key = s.strip().lower() normalized = STATUS_MAP.get(key, STATUS_MAP.get(s.strip(), s.strip())) return normalized, (normalized != original)
def parse_discount(x) -> Tuple[float, bool]: """ 返回 (discount_in_0_1, changed?) """ if x is None or (isinstance(x, float) and np.isnan(x)) or x == "": return (0.0, True) s = str(x).strip() original = s changed = False if s.endswith("%"): try: v = float(s[:-1]) return (round(v/100.0, 4), True) except: return (0.0, True) # 纯数值 try: v = float(s) # 若>1,按百分比处理 if v > 1.0: return (round(v/100.0, 4), True) return (round(v, 4), False) except: return (0.0, True)
def clean_number(s: Any) -> Tuple[float, bool]: """ 清理数值(去符号、去逗号、去未知值),返回(float或NaN, changed) """ if s is None: return (np.nan, False) if isinstance(s, (int, float)) and not isinstance(s, bool): return (float(s), False) txt = str(s).strip() original = txt # 处理未知/乱码 if txt in ["???","??","--","-","N/A","NA",""]: return (np.nan, True) # 去货币符号和逗号 txt = txt.replace(",", "").replace("¥","").replace("¥","").replace("$","") changed = (txt != original) try: return (float(txt), changed) except: return (np.nan, True)
def is_number(s: Any) -> bool: try: float(str(s)) return True except: return False
def fix_field_misalignment(row: pd.Series) -> Tuple[pd.Series, bool]: """ 针对 qty 非数值而后续字段呈现右移的情况进行修正。 规则: - 若 qty 非数值 且 unit_price 为数值 且 currency 非标准币种(或为数值), 则认为 item 与 qty 误拆分,执行左移一位并合并 item。 """ changed = False qty = row["qty"] unit_price = row["unit_price"] currency = row["currency"]
qty_is_num = is_number(qty)
unit_price_is_num = is_number(unit_price)
# 若currency看起来不像币种(例如为数字),或长度>4非字母,视为错位信号
currency_suspect = (is_number(currency) or (isinstance(currency, str) and (len(currency) > 4 and not currency.isalpha())))
if (not qty_is_num) and unit_price_is_num and currency_suspect:
# 左移一位:item = item + qty; 其余字段整体左移
order = ["item","qty","unit_price","currency","amount","discount","status","note"]
vals = [row[k] for k in order]
# 合并 item 与 qty
new_item = str(vals[0]) + str(vals[1])
shifted = [new_item] + vals[2:] + ["字段错位-自动修复"] # 左移并填补note
for k, v in zip(order, shifted):
row[k] = v
changed = True
return row, changed
def parse_date_str(s: Any) -> Tuple[str, bool, str]: """ 解析日期,统一为 YYYY-MM-DD。 返回: (标准化日期或空, 是否更改, 问题说明) 策略: - 优先尝试格式:%Y-%m-%d, %Y/%m/%d, %m/%d/%Y - 若失败,尝试 %Y-%d-%m(处理'2024-13-01' -> 2024-01-13) - 对歧义 '1/6/2024' 按业务上下文(样本多为1月)假定为MM/DD/YYYY => 2024-01-06 """ if not isinstance(s, str) or s.strip() == "": return "", False, "empty" original = s s = s.strip() fmts = ["%Y-%m-%d","%Y/%m/%d","%m/%d/%Y"] for f in fmts: try: d = dt.datetime.strptime(s, f).date() return d.isoformat(), (s != d.isoformat()), "" except: pass # 尝试 %Y-%d-%m try: d = dt.datetime.strptime(s, "%Y-%d-%m").date() # 记录更正 return d.isoformat(), True, "swapped_day_month" except: pass return "", False, "invalid"
def currency_normalize(row: pd.Series, base_ccy: str, rates: Dict[str,float]) -> Tuple[pd.Series, bool]: """ 将不同币种换算为基础币种,保留原值 original_* 字段,确保可追溯。 """ changed = False # 原始值备份(若无则写入) if "original_currency" not in row or pd.isna(row.get("original_currency", np.nan)): row["original_currency"] = row["currency"] if "original_unit_price" not in row or pd.isna(row.get("original_unit_price", np.nan)): row["original_unit_price"] = row["unit_price"] if "original_amount" not in row or pd.isna(row.get("original_amount", np.nan)): row["original_amount"] = row["amount"]
ccy = str(row["currency"]).strip() if isinstance(row["currency"], str) else row["currency"]
ccy_upper = str(ccy).upper()
# 同义词归一
if ccy_upper in ["RMB", "CNY", "¥", "¥"]:
ccy_upper = "CNY"
if ccy_upper not in rates:
# 未知币种,保持不变,仅记录
return row, changed
if ccy_upper != base_ccy:
rate = rates[ccy_upper]
if is_number(row["unit_price"]):
row["unit_price"] = round(float(row["unit_price"]) * rate, 4)
changed = True
if is_number(row["amount"]):
row["amount"] = round(float(row["amount"]) * rate, 4)
changed = True
row["currency"] = base_ccy
if changed:
row["issue_flags"] = append_flag(row.get("issue_flags",""), "currency_converted")
return row, changed
def append_flag(flags: str, new_flag: str) -> str: s = (flags or "").strip() if s == "": return new_flag parts = set([p.strip() for p in s.split(",") if p.strip()]) parts.add(new_flag) return ",".join(sorted(parts))
def compute_expected_amount(qty, unit_price, discount): if any([pd.isna(qty), pd.isna(unit_price), pd.isna(discount)]): return np.nan try: return round(float(qty) * float(unit_price) * (1.0 - float(discount)), 2) except: return np.nan
def pre_quality_scan(df: pd.DataFrame) -> Dict[str, Any]: m = {} m["total_rows_raw"] = len(df) m["duplicate_order_id_rows"] = int((df.duplicated(subset=["order_id"])).sum()) m["missing_qty_rows"] = int((df["qty"].astype(str).replace({"": np.nan}).isna() | ~df["qty"].astype(str).apply(is_number)).sum()) m["missing_unit_price_rows"] = int((df["unit_price"].astype(str).replace({"": np.nan}).isna() | ~df["unit_price"].astype(str).apply(lambda x: is_number(str(x).replace("$","").replace(",","")))).sum()) m["missing_amount_rows"] = int((df["amount"].astype(str).replace({"": np.nan}).isna() | ~df["amount"].astype(str).apply(lambda x: is_number(str(x).replace("$","").replace(",","")))).sum()) # 非标准日期(尝试解析失败计数) invalid_dates = 0 for s in df["order_date"].astype(str).tolist(): std, chg, note = parse_date_str(s) if note == "invalid": invalid_dates += 1 m["invalid_date_rows"] = invalid_dates # 非标准地区 nonstd_region = 0 for s in df["region"].astype(str).tolist(): norm, changed = normalize_region(s) if changed: nonstd_region += 1 m["region_normalization_needed_rows"] = nonstd_region # 非标准状态 nonstd_status = 0 for s in df["status"].astype(str).tolist(): norm, changed = normalize_status(s) if changed: nonstd_status += 1 m["status_normalization_needed_rows"] = nonstd_status # 可能错位 misaligned = 0 for _, row in df.iterrows(): _, changed = fix_field_misalignment(row.copy()) if changed: misaligned += 1 m["field_misalignment_rows"] = misaligned # 非基础币种 non_base_ccy = int((~df["currency"].str.upper().isin(["CNY","RMB"])).sum()) m["non_base_currency_rows"] = non_base_ccy return m
def post_quality_scan(df: pd.DataFrame) -> Dict[str, Any]: m = {} m["total_rows_cleaned"] = len(df) m["duplicate_order_id_rows"] = int((df.duplicated(subset=["order_id"])).sum()) m["missing_qty_rows"] = int(df["qty"].isna().sum()) m["missing_unit_price_rows"] = int(df["unit_price"].isna().sum()) m["missing_amount_rows"] = int(df["amount"].isna().sum()) m["invalid_date_rows"] = int(df["order_date"].isna().sum()) m["non_base_currency_rows"] = int((df["currency"].str.upper() != "CNY").sum()) # 统计标记 for flag in ["currency_converted","amount_corrected","field_misalignment_fixed","encoding_fixed","region_normalized","status_normalized","date_corrected","duplicate_merged","outlier_qty","outlier_unit_price"]: m[f"flag_{flag}"] = int(df["issue_flags"].fillna("").str.contains(flag).sum()) return m
def merge_duplicates(df: pd.DataFrame, log: logging.Logger) -> Tuple[pd.DataFrame, int]: """ 基于 order_id 合并重复: - 数值字段优先非空且校验通过 - 文本字段合并备注 - 保留 earliest order_date(或修正后的标准日期)并统一 """ if df["order_id"].duplicated().sum() == 0: return df, 0
cols = df.columns.tolist()
groups = []
removed = 0
for oid, g in df.groupby("order_id", sort=False):
if len(g) == 1:
groups.append(g.iloc[0])
continue
removed += len(g) - 1
# 选取首条为基准
base = g.iloc[0].copy()
# 合并 note
all_notes = [str(x) for x in g["note"].fillna("").tolist() if str(x).strip()!=""]
base["note"] = ";".join(dict.fromkeys(all_notes)) if all_notes else base.get("note","")
# 状态取规范化后一致值,若不一致优先“已支付”
statuses = g["status"].fillna("").tolist()
if "已支付" in statuses:
base["status"] = "已支付"
else:
base["status"] = statuses[0] if statuses else base["status"]
# 对数值字段:qty, unit_price, amount, discount 选非空优先,并在后续统一校验
for c in ["qty","unit_price","amount","discount"]:
vals = [v for v in g[c].tolist() if not pd.isna(v)]
if len(vals)>0:
# 若存在多个不同值,记录冲突
if len(set([str(v) for v in vals]))>1:
base["issue_flags"] = append_flag(base.get("issue_flags",""), f"duplicate_conflict_{c}")
base[c] = vals[0]
# 其他文本字段以首条为主
base["issue_flags"] = append_flag(base.get("issue_flags",""), "duplicate_merged")
groups.append(base)
df2 = pd.DataFrame(groups, columns=cols)
return df2, removed
def detect_outliers_iqr(series: pd.Series) -> pd.Series: """ IQR法标记异常:低于 Q1-1.5IQR 或 高于 Q3+1.5IQR 返回布尔序列 """ s = series.dropna().astype(float) if len(s) < 4: return pd.Series([False]len(series), index=series.index) q1, q3 = s.quantile(0.25), s.quantile(0.75) iqr = q3 - q1 low, high = q1 - 1.5iqr, q3 + 1.5*iqr out = (series.astype(float) < low) | (series.astype(float) > high) out = out.fillna(False) return out
def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", type=str, default="input_orders.csv", help="输入CSV路径") parser.add_argument("--output-dir", type=str, default="output", help="输出目录") parser.add_argument("--base-currency", type=str, default=DEFAULT_BASE_CCY, help="基础币种(默认CNY)") parser.add_argument("--rate-USD", type=float, default=DEFAULT_EXCHANGE_RATES["USD"], help="USD兑基础币种的汇率") args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
log_path = setup_logging(args.output_dir)
write_sample_if_missing(args.input)
# 读取
df_raw = read_csv(args.input)
# 保存原始快照
raw_snapshot = os.path.join(args.output_dir, "raw_snapshot.csv")
df_raw.to_csv(raw_snapshot, index=False, encoding="utf-8-sig")
logging.info("原始数据快照保存: %s", raw_snapshot)
# 预评估
pre_metrics = pre_quality_scan(df_raw)
logging.info("清洗前质量评估: %s", json.dumps(pre_metrics, ensure_ascii=False))
df = df_raw.copy()
# 基础清理:去空格
for c in df.columns:
df[c] = df[c].apply(safe_strip)
# 字段错位修复
misaligned_fixed = 0
new_rows = []
for _, row in df.iterrows():
row, changed = fix_field_misalignment(row)
if changed:
row["issue_flags"] = append_flag(row.get("issue_flags",""), "field_misalignment_fixed")
misaligned_fixed += 1
new_rows.append(row)
df = pd.DataFrame(new_rows, columns=df.columns)
# 文本规范化与编码修复
enc_fixed = 0
region_normed = 0
status_normed = 0
std_dates = 0
invalid_dates = 0
std_dates_list = []
region_list = []
status_list = []
for i, row in df.iterrows():
# 日期
std, changed, note = parse_date_str(row["order_date"])
if std != "":
if changed or (row["order_date"] != std):
row["issue_flags"] = append_flag(row.get("issue_flags",""), "date_corrected")
std_dates += 1
row["order_date"] = std
else:
if note == "invalid":
invalid_dates += 1
row["issue_flags"] = append_flag(row.get("issue_flags",""), "date_invalid")
row["order_date"] = np.nan
else:
row["order_date"] = np.nan
# 地区
before_region = row["region"]
rnorm, rchg = normalize_region(before_region)
if rchg:
region_normed += 1
row["issue_flags"] = append_flag(row.get("issue_flags",""), "region_normalized")
if maybe_fix_encoding(before_region) != before_region:
enc_fixed += 1
row["issue_flags"] = append_flag(row.get("issue_flags",""), "encoding_fixed")
row["region"] = rnorm
# 状态
snorm, schg = normalize_status(row["status"])
if schg:
status_normed += 1
row["issue_flags"] = append_flag(row.get("issue_flags",""), "status_normalized")
row["status"] = snorm
std_dates_list.append(row["order_date"])
region_list.append(row["region"])
status_list.append(row["status"])
df.loc[i] = row
# 数值标准化:qty, unit_price, amount, discount
qty_list = []
up_list = []
amt_list = []
disc_list = []
for i, row in df.iterrows():
# qty
q_raw = row["qty"]
qv_changed = False
if q_raw is None or str(q_raw).strip()=="":
q = np.nan
qv_changed = True
else:
# 去掉非数字
if is_number(q_raw):
q = float(q_raw)
else:
q = np.nan
qv_changed = True
# unit_price
up, chg_up = clean_number(row["unit_price"])
# amount
amt, chg_amt = clean_number(row["amount"])
# discount
disc, chg_disc = parse_discount(row["discount"])
if qv_changed or chg_up or chg_amt or chg_disc:
row["issue_flags"] = append_flag(row.get("issue_flags",""), "numeric_cleaned")
qty_list.append(q if (pd.isna(q) or q.is_integer()) else q) # 可能是浮点,但后续转int
up_list.append(up)
amt_list.append(amt)
disc_list.append(disc)
df.loc[i] = row
df["qty"] = qty_list
df["unit_price"] = up_list
df["amount"] = amt_list
df["discount"] = disc_list
# 货币转换到基础币种
rates = dict(DEFAULT_EXCHANGE_RATES)
if args.base_currency.upper() not in rates:
rates[args.base_currency.upper()] = 1.0
# 覆盖USD汇率
rates["USD"] = float(args.rate_USD) if hasattr(args, "rate_USD") else DEFAULT_EXCHANGE_RATES["USD"]
conv_changed = 0
for i, row in df.iterrows():
row, changed = currency_normalize(row, args.base_currency.upper(), rates)
if changed:
conv_changed += 1
df.loc[i] = row
# 金额核对与修正:amount = qty * unit_price * (1 - discount)
amount_corrected = 0
for i, row in df.iterrows():
exp_amt = compute_expected_amount(row["qty"], row["unit_price"], row["discount"])
if not pd.isna(exp_amt):
if pd.isna(row["amount"]) or (abs(float(row["amount"]) - exp_amt) > 0.01):
df.at[i, "amount"] = exp_amt
df.at[i, "issue_flags"] = append_flag(row.get("issue_flags",""), "amount_corrected")
amount_corrected += 1
# 数据类型微调
# qty尽可能为整数
def to_int_or_nan(v):
if pd.isna(v):
return np.nan
try:
if float(v).is_integer():
return int(float(v))
return int(round(float(v)))
except:
return np.nan
df["qty"] = df["qty"].apply(to_int_or_nan)
# 单价保留两位
df["unit_price"] = df["unit_price"].apply(lambda x: round(float(x), 4) if (not pd.isna(x)) else x)
df["amount"] = df["amount"].apply(lambda x: round(float(x), 2) if (not pd.isna(x)) else x)
# 合并重复订单
df, removed_dup = merge_duplicates(df, logging)
logging.info("重复记录合并完成,移除行数: %s", removed_dup)
# 分组异常检测(按 item 分组)
df["outlier_qty"] = False
df["outlier_unit_price"] = False
for item, g in df.groupby("item"):
idx = g.index
if g["qty"].notna().sum() >= 4:
out_q = detect_outliers_iqr(g["qty"].astype(float))
df.loc[idx, "outlier_qty"] = out_q
if g["unit_price"].notna().sum() >= 4:
out_p = detect_outliers_iqr(g["unit_price"].astype(float))
df.loc[idx, "outlier_unit_price"] = out_p
# 将异常标记写入 issue_flags,不进行值修改以保护业务信息
df.loc[df["outlier_qty"]==True, "issue_flags"] = df.loc[df["outlier_qty"]==True, "issue_flags"].apply(lambda x: append_flag(x, "outlier_qty"))
df.loc[df["outlier_unit_price"]==True, "issue_flags"] = df.loc[df["outlier_unit_price"]==True, "issue_flags"].apply(lambda x: append_flag(x, "outlier_unit_price"))
# 事后质量评估
post_metrics = post_quality_scan(df)
# 输出文件
cleaned_path = os.path.join(args.output_dir, "cleaned_orders.csv")
report_path = os.path.join(args.output_dir, "quality_report.json")
# 输出顺序与追溯字段
output_cols = [
"order_id","order_date","customer_id","region","item",
"qty","unit_price","currency","amount","discount",
"status","note",
"original_currency","original_unit_price","original_amount",
"issue_flags","outlier_qty","outlier_unit_price"
]
for c in output_cols:
if c not in df.columns:
df[c] = np.nan
df = df[output_cols]
df.to_csv(cleaned_path, index=False, encoding="utf-8-sig")
full_report = {
"run_time": dt.datetime.now().isoformat(),
"base_currency": args.base_currency.upper(),
"exchange_rates_used": rates,
"paths": {
"log": log_path,
"raw_snapshot": raw_snapshot,
"cleaned_csv": cleaned_path
},
"metrics_before": pre_metrics,
"metrics_after": post_metrics,
"notes": {
"field_misalignment_fixed": "检测到qty非数值且后续字段错位时,将item与qty合并并整体左移一位",
"date_correction": "按多格式解析并在必要时采用YYYY-DD-MM作为兜底修正(示例: 2024-13-01 -> 2024-01-13)",
"currency_conversion": "将USD等换算为基础币种CNY,保留original_*追溯;汇率来自脚本参数",
"amount_validation": "使用 amount = qty * unit_price * (1 - discount) 校验并修正",
"duplicates_merge": "按order_id合并重复并合并备注,保留非空且一致性更高的值",
"outlier_detection": "按item分组使用IQR法标记极端值但不自动修改"
}
}
with open(report_path, "w", encoding="utf-8") as f:
json.dump(full_report, f, ensure_ascii=False, indent=2)
logging.info("清洗完成。清洗文件: %s, 质量报告: %s", cleaned_path, report_path)
print("\n=== 输出路径 ===")
print(f"清洗后CSV: {cleaned_path}")
print(f"质量报告JSON: {report_path}")
print(f"处理日志: {log_path}")
if name == "main": main()
清洗效果评估
输出清洗后CSV数据路径
关键指标对比(基于所给样例数据的预期结果,实际以脚本输出为准)
业务价值评估
优化与持续监控建议
说明
Python自动化清洗脚本 以下脚本可直接运行:会先尝试从 ./data/input.csv 读取数据;若不存在,会自动用你提供的样例内容生成该文件。脚本将完成数据质量评估、字段错位修复、缺失值与格式统一、异常值标记、去重、输出清洗结果与质量报告,并生成处理日志,保证可追溯。
请将以下代码保存为 clean_events.py 后执行:python clean_events.py
#!/usr/bin/env python3
""" 事件数据自动清洗脚本
import os import re import json import math import logging import unicodedata from pathlib import Path from datetime import datetime import pandas as pd import numpy as np import calendar
BASE_DIR = Path(".").resolve() DATA_DIR = BASE_DIR / "data" OUT_DIR = BASE_DIR / "output" DATA_DIR.mkdir(parents=True, exist_ok=True) OUT_DIR.mkdir(parents=True, exist_ok=True)
INPUT_CSV = DATA_DIR / "input.csv" CLEANED_CSV = OUT_DIR / "cleaned_events.csv" DUPLICATES_CSV = OUT_DIR / "duplicates.csv" ANOMALIES_CSV = OUT_DIR / "anomalies_flagged.csv" QUALITY_REPORT_JSON = OUT_DIR / "quality_report.json" LOG_FILE = OUT_DIR / "cleaning.log"
logging.basicConfig( filename=str(LOG_FILE), filemode="w", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(name)
SAMPLE_CSV = """event_id,event_time,user_id,action,channel,device,city,duration_ms,spend,spend_unit,remark
e1,2024-02-01 10:01:02,U100,click,app,Android,深圳,450,0,CNY,正常
e1,2024-02-01 10:01:02,U100,click,app,Android,深圳,450,0,CNY,重复事件
e2,02/01/2024 10:02:03,U101,Click,App,iOS,深圳,abc,0,CNY,时长非数字
e3,2024/02/01 10:03:04,U102,regsitser,web,Web,上海,2300,0,CNY,拼写错误
e4,2024-02-30 09:00:00,U103,login,web,Web,广州,500,0,CNY,日期错误
e5,2024-02-01T10:05:06+08:00,U104,购买,app,Android,深圳,500,2.5,yuan,单位不统一
e6,2024-02-01 10:06:07,U105,purchase,app,Android,深圳,南山区,600,3,CNY,字段错位
e7,2024-02-01 10:07:08,U106,点击??,app,Android,厦门,1200000,0,CNY,异常值与乱码
e8,2024-02-01 10:08:09,U107,購買,app,Android,深圳,650,1,CNY,编码不规范
e9,2024-02-01 10:09:10,U108,logout,app,Android,ShenZhen,50,0,CNY,城市英文混入
"""
if not INPUT_CSV.exists():
INPUT_CSV.write_text(SAMPLE_CSV, encoding="utf-8")
logger.info(f"Wrote sample CSV to {INPUT_CSV}")
def normalize_text(s: str) -> str: """Unicode标准化 + 去除不可见字符 + 去掉两端空白""" if pd.isna(s): return s s = str(s) s = unicodedata.normalize("NFKC", s) # 替换常见乱码占位 s = s.replace("", "") # 压缩重复问号等 s = re.sub(r"?{2,}", "?", s) # 去除控制字符 s = "".join(ch for ch in s if unicodedata.category(ch)[0] != "C") return s.strip()
def looks_like_subcity(token: str) -> bool: """判断是否像城市子级(区/县/市等),用于字段错位修复""" if not token: return False t = token.strip() if re.search(r"[0-9]", t): return False # 包含常见中文行政区后缀或短词 if re.search(r"(区|县|市|镇|乡)$", t) and 1 <= len(t) <= 6: return True # 英文且较短,可能是城市拼写碎片 if re.match(r"^[A-Za-z-]{2,10}$", t): return True return False
def fix_row_tokens(header_cols, tokens): """修复字段错位:将city后多出的行政区碎片合并到city;其余多余字段并入remark末尾""" ncols = len(header_cols) city_idx = header_cols.index("city") remark_idx = header_cols.index("remark") notes = [] repaired = False
# 先合并city后面的一个或多个碎片
while len(tokens) > ncols and looks_like_subcity(tokens[city_idx + 1]):
tokens[city_idx] = f"{tokens[city_idx].strip()}-{tokens[city_idx + 1].strip()}"
del tokens[city_idx + 1]
repaired = True
notes.append("fix_city_merge_subcity")
# 仍多出的部分:并入remark(用;拼接),保持长度一致
if len(tokens) > ncols:
overflow = tokens[ncols - 1:] # remark及其后的所有
base_remark = tokens[remark_idx] if remark_idx < len(tokens) else ""
merged = ";".join([base_remark] + overflow[1:]).strip(";")
tokens = tokens[:remark_idx] + [merged]
repaired = True
notes.append("fix_remark_merge_overflow")
# 不足则补空
if len(tokens) < ncols:
tokens = tokens + [""] * (ncols - len(tokens))
repaired = True
notes.append("fix_pad_empty")
return tokens, repaired, notes
def parse_event_time_safe(s: str): """稳健解析event_time;若无效日期(如2月30日),回退为该月最后一天;保留本地时区时间(去tz)""" s0 = s s = normalize_text(s) ts = pd.to_datetime(s, errors="coerce") fixed = False reason = None
if pd.isna(ts):
# 检测YYYY-MM-DD HH:MM:SS形式的非法日
m = re.match(r"^\s*(\d{4})[-/](\d{2})[-/](\d{2})[ T](\d{2}):(\d{2}):(\d{2})\s*$", s)
if m:
y, mo, d, hh, mm, ss = map(int, m.groups())
# 若日期非法,调整为当月最后一天
last_day = calendar.monthrange(y, mo)[1]
if d > last_day:
d = last_day
fixed = True
reason = "fixed_invalid_day_to_month_end"
try:
ts = pd.Timestamp(year=y, month=mo, day=d, hour=hh, minute=mm, second=ss)
except Exception:
ts = pd.NaT
if isinstance(ts, pd.Timestamp) and ts.tz is not None:
# 转为Asia/Shanghai并去掉tz
try:
ts = ts.tz_convert("Asia/Shanghai").tz_localize(None)
fixed = True
if not reason:
reason = "tz_converted_to_Asia_Shanghai"
except Exception:
# 有些tz是naive;忽略
ts = ts.tz_localize(None)
fixed = True
if not reason:
reason = "tz_removed"
return ts, fixed, reason
def standardize_action(raw: str): """动作标准化到集合:click/register/login/purchase/logout,返回标准值与是否修正""" s = normalize_text(raw).lower() # 去除非字母汉字 s2 = re.sub(r"[^a-z\u4e00-\u9fa5]", "", s)
mapping = {
"click": "click",
"注册": "register",
"注册用户": "register",
"register": "register",
"登录": "login",
"login": "login",
"登出": "logout",
"logout": "logout",
"购买": "purchase",
"購買": "purchase",
"买": "purchase",
"purchase": "purchase",
"点击": "click",
}
# 精确或近似匹配
candidates = [s2, s]
for c in candidates:
if c in mapping:
val = mapping[c]
return val, (val != raw)
# 常见拼写错误修复
if s2 in {"regsitser", "regsiter", "regist", "regster"}:
return "register", True
if s2 in {"logon"}:
return "login", True
# 模糊:中文里含“购”“买”
if re.search(r"[购買买]", s2):
return "purchase", True
if "点" in s2 or "击" in s2:
return "click", True
# 未知,保留原文但标记
return raw, False
def standardize_channel(raw: str): s = normalize_text(raw).lower() if s in {"app", "web"}: return s, (s != raw) return s, (s != raw)
def standardize_device(raw: str): s = normalize_text(raw).lower() mapping = {"android": "Android", "ios": "iOS", "web": "Web"} if s in mapping: return mapping[s], (mapping[s] != raw) # 保底:首字母大写 std = s.capitalize() return std, (std != raw)
def standardize_city(raw: str): s = normalize_text(raw) # 统一英文/大小写到中文名 m = s.lower() mapping = { "shenzhen": "深圳", "shenzhen": "深圳", "shen zhen": "深圳", } if m in mapping: return mapping[m], (mapping[m] != raw) # 若包含“-子级”,保留 return s, (s != raw)
def to_float_safe(x): x = normalize_text(x) if x == "" or x is None: return np.nan try: return float(x) except: # 抽取数字 m = re.search(r"-?\d+(.\d+)?", x) return float(m.group()) if m else np.nan
def to_int_safe(x): f = to_float_safe(x) if pd.isna(f): return np.nan try: return int(round(f)) except: return np.nan
def iqr_outlier_flags(series: pd.Series): """返回是否高/低异常(IQR方法),不足样本时退化为硬阈值""" s = pd.to_numeric(series, errors="coerce") valid = s.dropna() high = pd.Series([False]*len(series), index=series.index) low = pd.Series([False]*len(series), index=series.index)
if len(valid) >= 5:
q1, q3 = valid.quantile(0.25), valid.quantile(0.75)
iqr = q3 - q1
upper = q3 + 1.5 * iqr
lower = q1 - 1.5 * iqr
high = s > upper
low = s < lower
else:
# 退化阈值:负值或超过30分钟
high = s > 30 * 60 * 1000
low = s < 0
return low.fillna(False), high.fillna(False)
with INPUT_CSV.open("r", encoding="utf-8") as f: lines = [line.rstrip("\n") for line in f]
if not lines: raise RuntimeError("输入CSV为空")
header = [h.strip() for h in lines[0].split(",")] raw_rows = [] fix_notes = [] for i, line in enumerate(lines[1:], start=2): # 从第2行起(人类计数) tokens = [t.strip() for t in line.split(",")] tokens, repaired, notes = fix_row_tokens(header, tokens) raw_rows.append(tokens) fix_notes.append({"line_number": i, "repaired": repaired, "notes": notes})
df = pd.DataFrame(raw_rows, columns=header)
df_orig = df.copy(deep=True)
df["_clean_notes"] = ""
def add_note(idx, note): df.at[idx, "_clean_notes"] = (df.at[idx, "_clean_notes"] + ";" + note).strip(";")
rows_before = len(df) dup_by_eventid_before = int(df.duplicated(subset=["event_id"]).sum()) dup_full_before = int(df.duplicated().sum()) missing_before = df.replace({"": np.nan}).isna().sum().to_dict()
logger.info(f"Rows before: {rows_before}, dup(event_id): {dup_by_eventid_before}, dup(full): {dup_full_before}") logger.info(f"Missing before (empty as NaN): {missing_before}")
for col in df.columns: df[col] = df[col].apply(lambda x: normalize_text(x))
df["event_time_parsed"] = pd.NaT df["event_time_fixed"] = False df["event_time_fix_reason"] = "" for idx, val in df["event_time"].items(): ts, fixed, reason = parse_event_time_safe(val) df.at[idx, "event_time_parsed"] = ts df.at[idx, "event_time_fixed"] = bool(fixed) df.at[idx, "event_time_fix_reason"] = reason if reason else "" if fixed: add_note(idx, f"time_fixed:{reason}") df["event_time"] = df["event_time_parsed"] df.drop(columns=["event_time_parsed"], inplace=True)
df["action_std_changed"] = False for idx, val in df["action"].items(): std, changed = standardize_action(val) df.at[idx, "action"] = std df.at[idx, "action_std_changed"] = bool(changed) if changed: add_note(idx, f"action_std:{val}->{std}")
df["channel_std_changed"] = False df["device_std_changed"] = False for idx in df.index: ch, chg = standardize_channel(df.at[idx, "channel"]) df.at[idx, "channel"] = ch df.at[idx, "channel_std_changed"] = bool(chg) dv, dchg = standardize_device(df.at[idx, "device"]) df.at[idx, "device"] = dv df.at[idx, "device_std_changed"] = bool(dchg) if chg: add_note(idx, "channel_std") if dchg: add_note(idx, "device_std")
df["city_std_changed"] = False for idx, val in df["city"].items(): std, changed = standardize_city(val) df.at[idx, "city"] = std df.at[idx, "city_std_changed"] = bool(changed) if changed: add_note(idx, f"city_std:{val}->{std}")
df["duration_ms_raw"] = df["duration_ms"] df["duration_ms"] = df["duration_ms"].apply(to_int_safe)
df["spend_raw"] = df["spend"] df["spend"] = df["spend"].apply(to_float_safe)
df["spend_unit_std_changed"] = False for idx, val in df["spend_unit"].items(): v = normalize_text(val).lower() std = v if v in {"cny", "rmb", "元", "yuan"}: std = "CNY" df.at[idx, "spend_unit_std_changed"] = (std != val) df.at[idx, "spend_unit"] = std if std != val: add_note(idx, f"spend_unit_std:{val}->{std}")
low_flag, high_flag = iqr_outlier_flags(df["duration_ms"]) df["duration_outlier_low"] = low_flag.values df["duration_outlier_high"] = high_flag.values df["duration_outlier"] = df["duration_outlier_low"] | df["duration_outlier_high"]
df["duration_imputed"] = False if df["duration_ms"].isna().any(): # 计算组内中位数(排除异常值) robust_median = ( df.loc[~df["duration_outlier"], ["action", "duration_ms"]] .groupby("action", dropna=False)["duration_ms"] .median() ) for idx in df.index[df["duration_ms"].isna()]: g = df.at[idx, "action"] med = robust_median.get(g, np.nan) if pd.isna(med): med = df.loc[~df["duration_outlier"], "duration_ms"].median() if not pd.isna(med): df.at[idx, "duration_ms"] = int(round(med)) df.at[idx, "duration_imputed"] = True add_note(idx, f"duration_imputed:{med}")
def is_text_garbled(s: str) -> bool: if s is None: return False s = str(s) # 含连续问号、替换符或明显混杂不可见字符 return ("??" in s) or ("" in s)
df["remark_garbled"] = df["remark"].apply(is_text_garbled) df["action_garbled"] = df["action"].apply(lambda x: is_text_garbled(x)) for idx in df.index: if df.at[idx, "remark_garbled"] or df.at[idx, "action_garbled"]: add_note(idx, "text_garbled_cleaned")
df["is_duplicate_eventid"] = df.duplicated(subset=["event_id"], keep="first") dups = df.loc[df["is_duplicate_eventid"]].copy() if not dups.empty: dups.to_csv(DUPLICATES_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved duplicates to {DUPLICATES_CSV} count={len(dups)}")
df_clean = df.loc[~df["is_duplicate_eventid"]].copy() rows_after = len(df_clean)
biz_cols = ["event_id", "event_time", "user_id", "action", "channel", "device", "city", "duration_ms", "spend", "spend_unit", "remark"] flag_cols = [ "event_time_fixed", "event_time_fix_reason", "action_std_changed", "channel_std_changed", "device_std_changed", "city_std_changed", "spend_unit_std_changed", "duration_outlier", "duration_outlier_low", "duration_outlier_high", "duration_imputed", "is_duplicate_eventid" ] trace_cols = ["duration_ms_raw", "spend_raw", "_clean_notes"]
anomalies = df_clean.loc[df_clean["duration_outlier"] | df_clean["remark_garbled"] | df_clean["action_garbled"]].copy() if not anomalies.empty: anomalies.to_csv(ANOMALIES_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved anomalies to {ANOMALIES_CSV} count={len(anomalies)}")
df_clean[biz_cols + flag_cols + trace_cols].to_csv(CLEANED_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved cleaned CSV to {CLEANED_CSV} rows={len(df_clean)}")
report = { "rows_before": rows_before, "rows_after": rows_after, "duplicates_by_eventid_before": int(dup_by_eventid_before), "duplicates_removed": int(dup_by_eventid_before), "full_row_duplicates_before": int(dup_full_before), "missing_before_empty_as_nan": missing_before, "datetime_fixed_count": int(df["event_time_fixed"].sum()), "duration_missing_after_impute": int(df_clean["duration_ms"].isna().sum()), "duration_imputed_count": int(df_clean["duration_imputed"].sum()), "duration_outlier_count": int(df_clean["duration_outlier"].sum()), "action_standardized_count": int(df_clean["action_std_changed"].sum()), "channel_standardized_count": int(df_clean["channel_std_changed"].sum()), "device_standardized_count": int(df_clean["device_std_changed"].sum()), "city_standardized_count": int(df_clean["city_std_changed"].sum()), "spend_unit_standardized_count": int(df_clean["spend_unit_std_changed"].sum()), "garbled_text_rows": int((df_clean["remark_garbled"] | df_clean["action_garbled"]).sum()), "outputs": { "cleaned_csv": str(CLEANED_CSV), "duplicates_csv": str(DUPLICATES_CSV), "anomalies_csv": str(ANOMALIES_CSV), "quality_report_json": str(QUALITY_REPORT_JSON), "log_file": str(LOG_FILE), }, "field_misalignment_fixes": fix_notes, } with open(QUALITY_REPORT_JSON, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2)
print(str(CLEANED_CSV)) print(str(QUALITY_REPORT_JSON)) print(str(LOG_FILE))
清洗后的CSV数据路径
清洗效果评估 基于你提供的样例数据(9行,e1-e9),预计关键指标对比如下:
业务价值分析
优化与持续监控建议
说明
如需对接你们的生产路径或调整规则阈值,请告知实际目录与口径,我会给出配置化版本。
Python自动化清洗脚本 下面的脚本可直接运行。若当前目录不存在 input.csv,会自动写入你提供的示例数据并完成全流程清洗、生成日志与质量报告。运行完成后,将在 ./output 目录下生成标准化后的 cleaned.csv、清洗日志 cleaning_log.txt 和质量报告 quality_report.json。
请将以下内容保存为 clean_inventory.py 并运行:python clean_inventory.py
#!/usr/bin/env python3
""" 库存/进销存CSV数据清洗脚本
使用说明:
依赖:pandas, numpy 安装:pip install pandas numpy """
import os import re import json import math import logging import unicodedata from datetime import datetime from typing import List, Dict
try: import pandas as pd import numpy as np except Exception as e: raise SystemExit("请先安装所需依赖: pip install pandas numpy\n错误: {}".format(e))
INPUT_PATH = "./input.csv" OUTPUT_DIR = "./output" NORMALIZED_PATH = os.path.join(OUTPUT_DIR, "_normalized.csv") CLEANED_PATH = os.path.join(OUTPUT_DIR, "cleaned.csv") REPORT_PATH = os.path.join(OUTPUT_DIR, "quality_report.json") LOG_PATH = os.path.join(OUTPUT_DIR, "cleaning_log.txt")
EXPECTED_COLUMNS = [ "record_id","tx_date","sku","sku_name","warehouse","in_qty","out_qty", "unit","unit_cost","currency","total_cost","batch","remark" ]
BASE_CURRENCY = "CNY"
ENABLE_FX = False FX_RATE = { # "USD": 7.20, # 示例:1 USD = 7.20 CNY }
IQR_K = 1.5
AMOUNT_TOL = 1e-6
os.makedirs(OUTPUT_DIR, exist_ok=True)
logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_PATH, encoding='utf-8'), logging.StreamHandler() ] ) log = logging.getLogger("cleaner")
SAMPLE_CSV = """record_id,tx_date,sku,sku_name,warehouse,in_qty,out_qty,unit,unit_cost,currency,total_cost,batch,remark r001,2024-03-01,S1001,矿泉水500ml,A仓,100,0,瓶,1.2,CNY,120,,入库 r002,2024/03/01,S1001,矿泉水500ML,A仓,,10,箱,30,CNY,300,20240301,数量缺失与单位不同 r003,2024-03-02,S1002,饼干-巧克力味,B仓,50,0,袋,¥2.5,CNY,125,,单价含符号 r004,2024-03-33,S1003,挂面,C仓,30,0,袋,1.8,CNY,5,,金额异常与日期错误 r005,2024-03-03,S1004,牛奶-原味,B仓,5000,0,瓶,3,CNY,15000,,数量异常 r006,2024-03-03,S1004,牛奶—原味,B仓,50,0,瓶,3,CNY,150,,破折号全角 r007,2024-03-04,S1005,酸奶,仓库,20,0,瓶,3,CNY,60,,正常 r007,2024-03-04,S1005,酸奶,仓库,20,0,瓶,3,CNY,60,,重复记录 r009,2024-03-05,S1006,薯片,C仓,10,0,包,2.5,USD,25,,币种不同 r010,2024-03-05,S1007,方便面,C仓,10,0,袋,2.,CNY,20,,单价格式异常 r011,2024-03-06,S1008,辣条,D仓,10,0,袋,2,CNY,20,批次A,备注含逗号,未引用 r012,2024-03-06,S1009,纱海ç›Â,D仓,10,0,袋,2,CNY,20,,文本乱码 r013,2024-03-06,S1010,矿泉誰,E仓,15,0,瓶,2.2,CNY,33,,拼写错误 """
if not os.path.exists(INPUT_PATH): with open(INPUT_PATH, "w", encoding="utf-8") as f: f.write(SAMPLE_CSV) log.info("未找到输入文件,已写入示例数据到 %s", INPUT_PATH)
def safe_strip(x): return x.strip() if isinstance(x, str) else x
def nfkc(s: str) -> str: if not isinstance(s, str): return s s = s.strip() s = unicodedata.normalize("NFKC", s) # 标准化破折号/连字符 s = s.replace("—", "-").replace("–", "-").replace("-", "-") # 标准化货币符号空格 s = s.replace("¥", "¥") return s
def looks_mojibake(s: str) -> bool: if not isinstance(s, str): return False # 简单启发式:包含典型乱码片段 return ("Ã" in s) or ("Â" in s)
def fix_mojibake_try(s: str) -> str: """ 尝试修复常见UTF-8->latin1误解码的乱码。 保守处理:若结果不可读则返回原文。 """ if not isinstance(s, str): return s try: tentative = s.encode("latin1", errors="ignore").decode("utf-8", errors="ignore") # 若修复后字符数较为合理且不为空,则返回修复结果 if tentative and tentative != s: return tentative except Exception: pass return s
def parse_numeric(val): """ 数值解析:去除货币符号/千分位/多余文本,转为float """ if pd.isna(val): return np.nan if isinstance(val, (int, float)): return float(val) s = nfkc(str(val)) s = s.replace(",", "") # 去除常见货币/字母 s = re.sub(r"[A-Za-z¥$¥\s]", "", s) # 仅保留数字、小数点和负号 s = re.sub(r"[^0-9.-]", "", s) if s in ("", ".", "-"): return np.nan try: return float(s) except Exception: return np.nan
def parse_date_safe(s, fallback=None): """ 日期解析:支持YYYY-MM-DD或YYYY/MM/DD;非法返回NaT。 若fallback为8位数字批次(YYYYMMDD)且解析失败,则用fallback。 """ if pd.isna(s): s = "" s = nfkc(str(s)) s = s.replace("/", "-") dt = pd.to_datetime(s, errors="coerce", format="mixed") if pd.isna(dt) and fallback: fb = nfkc(str(fallback)) if re.fullmatch(r"\d{8}", fb): try: dt = pd.to_datetime(fb, format="%Y%m%d", errors="coerce") except Exception: dt = pd.NaT return dt
def normalize_csv(input_path, normalized_path, expected_cols: List[str]):
"""
规范化CSV:
- 修复未引用逗号导致的列错位:当列数>预期时,合并超出列到最后一列
- 当列数<预期时,用空值补齐
- 输出标准化CSV便于pandas读取
"""
import csv
with open(input_path, "r", encoding="utf-8") as f_in,
open(normalized_path, "w", encoding="utf-8", newline="") as f_out:
reader = csv.reader(f_in)
writer = csv.writer(f_out)
header = next(reader, None)
# 忽略输入头与预期不一致的情况,使用预期头
writer.writerow(expected_cols)
for row in reader:
if row is None:
continue
# 合并多余的列到最后一列remark
if len(row) > len(expected_cols):
row = row[:len(expected_cols)-1] + [",".join(row[len(expected_cols)-1:])]
elif len(row) < len(expected_cols):
row = row + [""] * (len(expected_cols) - len(row))
writer.writerow(row)
log.info("已规范化CSV到: %s", normalized_path)
def iqr_outlier_flags(series: pd.Series): """ IQR异常值标记: 返回布尔Series """ s = series.dropna().astype(float) if len(s) < 4: return pd.Series([False]*len(series), index=series.index) q1, q3 = np.percentile(s, [25, 75]) iqr = q3 - q1 low = q1 - IQR_K * iqr high = q3 + IQR_K * iqr return (series < low) | (series > high)
normalize_csv(INPUT_PATH, NORMALIZED_PATH, EXPECTED_COLUMNS) df = pd.read_csv(NORMALIZED_PATH, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "None"])
df_raw = df.copy(deep=True)
for c in EXPECTED_COLUMNS: df[c] = df[c].apply(nfkc)
for c in ["in_qty", "out_qty", "unit_cost", "total_cost"]: df[c + "_raw"] = df[c] df[c] = df[c].apply(parse_numeric)
df["tx_date_raw"] = df["tx_date"] df["tx_date"] = df.apply(lambda r: parse_date_safe(r["tx_date_raw"], fallback=r.get("batch")), axis=1)
for c in ["sku_name", "remark", "warehouse"]: df[c + "_raw"] = df[c] need_fix = df[c].apply(looks_mojibake) df.loc[need_fix, c] = df.loc[need_fix, c].apply(fix_mojibake_try)
def normalize_name(name: str) -> str: if not isinstance(name, str): return name s = nfkc(name) # 统一ml大小写 s = re.sub(r"\bML\b", "ml", s, flags=re.IGNORECASE) # 统一连字符(已在nfkc替换) s = s return s
df["sku_name"] = df["sku_name"].apply(normalize_name)
df["unit_raw2"] = df["unit"] df["unit"] = df["unit"].apply(nfkc)
df["currency_raw"] = df["currency"] df["currency"] = df["currency"].str.upper().str.strip()
def count_invalid_dates(series): return int(series.isna().sum())
before_report = { "rows": int(len(df_raw)), "missing": { c: int((df_raw[c]=="" ).sum()) for c in EXPECTED_COLUMNS }, "invalid_dates": count_invalid_dates(df["tx_date"]), # 解析后NaT视为无效 "duplicates_by_record_id": int(df_raw.duplicated(subset=["record_id"]).sum()), "currency_heterogeneous": int((df["currency"] != BASE_CURRENCY).sum()), "amount_mismatch": 0, # 稍后计算 "unit_mixed_by_sku": 0, # 稍后计算 "mojibake_suspects": int(df_raw["sku_name"].apply(looks_mojibake).sum()), }
qc_flags = []
df["qc_flags"] = "" df["notes"] = ""
def add_flag(idx, flag): df.at[idx, "qc_flags"] = "|".join([x for x in [df.at[idx, "qc_flags"], flag] if x])
def add_note(idx, note): df.at[idx, "notes"] = " ; ".join([x for x in [df.at[idx, "notes"], note] if x])
for idx, r in df.iterrows(): in_qty, out_qty, unit_cost, total_cost = r["in_qty"], r["out_qty"], r["unit_cost"], r["total_cost"]
# 填充缺失的in_qty
if (pd.isna(in_qty) or in_qty==0) and not pd.isna(unit_cost) and not pd.isna(total_cost) and unit_cost not in (0, 0.0):
inferred = total_cost / unit_cost
df.at[idx, "in_qty"] = inferred
add_flag(idx, "in_qty_inferred")
add_note(idx, f"in_qty由total_cost/unit_cost推导={inferred:.6f}")
# 填充缺失的unit_cost
in_qty = df.at[idx, "in_qty"]
out_qty = df.at[idx, "out_qty"]
if pd.isna(unit_cost) and not pd.isna(total_cost):
qty_ref = None
if not pd.isna(in_qty) and in_qty != 0:
qty_ref = in_qty
elif not pd.isna(out_qty) and out_qty != 0:
qty_ref = out_qty
if qty_ref:
uc = total_cost / qty_ref
df.at[idx, "unit_cost"] = uc
add_flag(idx, "unit_cost_inferred")
add_note(idx, f"unit_cost由total_cost/qty推导={uc:.6f}")
mismatch_count = 0 corrected_count = 0 for idx, r in df.iterrows(): in_qty, out_qty, unit_cost, total_cost = r["in_qty"], r["out_qty"], r["unit_cost"], r["total_cost"] if pd.isna(unit_cost): continue qty_ref = None if not pd.isna(in_qty) and in_qty > 0: qty_ref = in_qty elif not pd.isna(out_qty) and out_qty > 0: qty_ref = out_qty if qty_ref is None: continue expected_total = unit_cost * qty_ref if pd.isna(total_cost) or abs(expected_total - total_cost) > AMOUNT_TOL: mismatch_count += 1 # 修正金额 df.at[idx, "total_cost"] = expected_total add_flag(idx, "total_cost_corrected") add_note(idx, f"金额由{total_cost}修正为{expected_total:.6f}") corrected_count += 1
before_report["amount_mismatch"] = mismatch_count
df["qty_for_outlier"] = df[["in_qty", "out_qty"]].fillna(0).max(axis=1) df["is_outlier_qty"] = False for (sku, unit), g in df.groupby(["sku", "unit"]): flags = iqr_outlier_flags(g["qty_for_outlier"]) df.loc[g.index, "is_outlier_qty"] = flags df.loc[g.index[flags], "qc_flags"] = df.loc[g.index[flags], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "qty_outlier"] if x]))
df["date_invalid"] = df["tx_date"].isna() df.loc[df["date_invalid"], "qc_flags"] = df.loc[df["date_invalid"], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "date_invalid"] if x]))
df["currency_mismatch"] = df["currency"] != BASE_CURRENCY if ENABLE_FX: for idx, r in df.iterrows(): cur = r["currency"] if cur != BASE_CURRENCY and isinstance(cur, str) and cur in FX_RATE: rate = FX_RATE[cur] # 仅计算一个本币金额参考,不覆盖原始金额 if not pd.isna(r["total_cost"]): add_note(idx, f"按汇率{cur}->{BASE_CURRENCY}={rate} 折算本币金额供参考") df.at[idx, "total_cost_in_base"] = r["total_cost"] * rate elif cur != BASE_CURRENCY: add_note(idx, f"异币种未转换,缺少汇率: {cur}")
mixed_unit_skus = [] unit_map = df.groupby("sku")["unit"].nunique() for sku, nunique in unit_map.items(): if nunique > 1: mixed_unit_skus.append(sku) before_report["unit_mixed_by_sku"] = len(mixed_unit_skus)
df["spelling_suspect"] = df["sku_name"].apply(lambda s: isinstance(s, str) and ("誰" in s or "?" in s)) df.loc[df["spelling_suspect"], "qc_flags"] = df.loc[df["spelling_suspect"], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "spelling_suspect"] if x]))
dup_mask = df.duplicated(subset=["record_id"], keep=False) duplicate_ids = df.loc[dup_mask, "record_id"].unique().tolist() dup_count = int(df.duplicated(subset=["record_id"]).sum())
def merge_duplicates(group: pd.DataFrame) -> pd.Series: # 记录合并策略:保留第一条,备注合并去重拼接;如关键字段冲突,记录conflict标记 g = group.sort_values(by=["tx_date_raw"], kind="stable") first = g.iloc[0].copy() conflicts = [] # 合并备注 remarks = [x for x in g["remark"].tolist() if isinstance(x, str) and x != ""] if remarks: merged_remark = " | ".join(pd.unique(remarks)) first["remark"] = merged_remark first["qc_flags"] = "|".join([x for x in [first["qc_flags"], "merged_duplicate"] if x]) first["notes"] = " ; ".join([x for x in [first["notes"], f"合并{len(g)-1}条重复备注"] if x]) # 关键字段一致性检查 key_cols = ["tx_date","sku","sku_name","warehouse","in_qty","out_qty","unit","unit_cost","currency","total_cost","batch"] for c in key_cols: vals = pd.unique(g[c].astype(str)) if len(vals) > 1: conflicts.append(c) if conflicts: first["qc_flags"] = "|".join([x for x in [first["qc_flags"], "duplicate_conflict"] if x]) first["notes"] = " ; ".join([x for x in [first["notes"], f"重复记录字段不一致: {','.join(conflicts)}"] if x]) return first
if dup_count > 0: df = df.groupby("record_id", as_index=False, group_keys=False).apply(merge_duplicates) df = df.reset_index(drop=True)
after_report = { "rows": int(len(df)), "missing": { # 对字符串型列,统计空字符串;对数值与日期,统计NaN或NaT "in_qty": int(df["in_qty"].isna().sum()), "out_qty": int(df["out_qty"].isna().sum()), "unit_cost": int(df["unit_cost"].isna().sum()), "total_cost": int(df["total_cost"].isna().sum()), "tx_date_invalid": int(df["tx_date"].isna().sum()), "batch_missing": int((df["batch"]=="").sum()), }, "duplicates_by_record_id": int(df.duplicated(subset=["record_id"]).sum()), "amount_corrected": int((df["qc_flags"].str.contains("total_cost_corrected", na=False)).sum()), "in_qty_inferred": int((df["qc_flags"].str.contains("in_qty_inferred", na=False)).sum()), "unit_mixed_skus": mixed_unit_skus, "qty_outliers": int(df["is_outlier_qty"].sum()), "currency_mismatch": int((df["currency_mismatch"]).sum()), "mojibake_fixed": int((df["sku_name_raw"] != df["sku_name"]).sum()), "spelling_suspect": int(df["spelling_suspect"].sum()), }
quality_report = { "before": before_report, "after": after_report, "notes": { "base_currency": BASE_CURRENCY, "fx_enabled": ENABLE_FX, "fx_rate_available": list(FX_RATE.keys()) } }
df_export = df.copy()
df_export["tx_date"] = df_export["tx_date"].dt.strftime("%Y-%m-%d")
preferred_cols = [ "record_id","tx_date","sku","sku_name","warehouse","in_qty","out_qty","unit","unit_cost","currency","total_cost","batch","remark", "tx_date_raw","sku_name_raw","unit_cost_raw","total_cost_raw","currency_raw","unit_raw2","remark_raw", "qc_flags","notes" ] for c in preferred_cols: if c not in df_export.columns: df_export[c] = None
df_export = df_export[preferred_cols]
df_export.to_csv(CLEANED_PATH, index=False, encoding="utf-8-sig") with open(REPORT_PATH, "w", encoding="utf-8") as f: json.dump(quality_report, f, ensure_ascii=False, indent=2)
log.info("清洗完成:%s", CLEANED_PATH) log.info("质量报告:%s", REPORT_PATH) log.info("日志:%s", LOG_PATH)
print("\n=== 清洗效果摘要 ===") print(json.dumps(quality_report, ensure_ascii=False, indent=2)) print(f"\n输出文件: {CLEANED_PATH}") print(f"日志: {LOG_PATH}") print(f"质量报告: {REPORT_PATH}")
清洗后的CSV数据路径 ./output/cleaned.csv
清洗效果评估 以下为基于你提供的数据样例(运行脚本即可得到同等或更详尽的报告):
关键指标对比(预期/示例)
业务价值分析
优化建议
说明与合规
如需我将“单位换算表/汇率”接入并自动转换,请提供映射表(例如:S1001 箱->瓶=12;USD->CNY=7.20,生效区间)或允许我生成参数化模板供你维护。
为商业团队提供一套可直接上手的“数据清洗专家”提示词,让任何人都能快速产出可执行、可审计的清洗方案。通过专家化角色与闭环流程,自动完成数据质量体检、清洗规则设计、处理步骤规划、结果验证与优化建议,帮助你把杂乱数据迅速转化为可信的分析资产。核心价值:提升数据可用性与可信度、缩短项目周期、降低人力与沟通成本,支持销售、用户行为、库存、财务等常见业务场景与多种数据源格式。试用即得:只需输入数据源类型、主要问题、业务用途与质量要求,即刻生成清洗报告与可执行流程,助你从“先清洗后分析”迈向“清洗即分析”的高效协作。
用本提示快速完成数据质量评估与预处理,生成可视化清洗报告,提升模型训练与报表结论的可信度。
整合多渠道行为数据,一键统一时间与用户标识,去重与异常剔除,迅速产出可对比的转化与留存指标。
规范SKU、品牌与单位,修正价格与库存异常,清理重复商品档案,保障销量分析与补货决策准确。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期