数据清洗脚本生成

0 浏览
0 试用
0 购买
Nov 28, 2025更新

本提示词针对商业数据清洗场景,自动评估CSV数据质量,生成缺失值处理、标准化和异常值处理规则,并输出可执行Python清洗脚本和清洗效果报告,帮助提升数据可靠性并支撑后续分析。

Python自动化清洗脚本 #!/usr/bin/env python3

-- coding: utf-8 --

""" 订单数据清洗与质量评估脚本 功能:

  • 读取CSV,进行数据质量评估(缺失、重复、格式、异常)
  • 执行清洗:缺失填充/标记、格式统一(日期/文本/数值/币种)、字段错位修复、重复合并、金额核对与修正、单位换算
  • 分组异常检测(IQR法)并标记
  • 输出清洗后的CSV、质量报告JSON和处理日志
  • 严格遵循数据治理要求:不删除关键业务信息,所有修正可追溯(保留原字段副本与issue_flags)

使用方式: python clean_orders.py --input input_orders.csv --output-dir output --base-currency CNY --rate-USD 7.0 若未提供 --input,将自动写入本题示例数据到 ./input_orders.csv 并读取。 """ import os import sys import json import math import argparse import logging import datetime as dt from typing import Dict, Any, List, Tuple

import pandas as pd import numpy as np

EXPECTED_COLUMNS = [ "order_id","order_date","customer_id","region","item","qty", "unit_price","currency","amount","discount","status","note" ]

可配置参数:汇率、基础币种

DEFAULT_BASE_CCY = "CNY" DEFAULT_EXCHANGE_RATES = { "USD": 7.0, # 可按需通过命令行覆盖 "CNY": 1.0, "RMB": 1.0 }

地区与状态标准化映射

REGION_MAP = { "华东": "华东", "hua dong": "华东", "hua dong": "华东", "hua_dong": "华东", "huadong": "华东", "华東": "华东", "华北": "华北", "华南": "华南", "华中": "华中", "西南": "西南", "西北": "西北", "东北": "东北" } STATUS_MAP = { "已支付": "已支付", "paid": "已支付", "paid ": "已支付", "paid.": "已支付", "paid,": "已支付", "paid/": "已支付", "paid\": "已支付", "paid;": "已支付", "paid:": "已支付", "paid!": "已支付", "paid?": "已支付", "PAID": "已支付", "Paid": "已支付", "已取消": "已取消", "取消": "已取消", "未支付": "未支付", "待支付": "未支付", "pending": "未支付" }

def setup_logging(output_dir:str): os.makedirs(output_dir, exist_ok=True) log_path = os.path.join(output_dir, "cleaning_log.txt") logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(log_path, encoding="utf-8"), logging.StreamHandler(sys.stdout) ] ) logging.info("日志初始化完成 -> %s", log_path) return log_path

def write_sample_if_missing(path:str): if os.path.exists(path): return sample = """order_id,order_date,customer_id,region,item,qty,unit_price,currency,amount,discount,status,note 1001,2024-01-05,CU001,华东,咖啡豆A,2,58,CNY,116,0.05,已支付,首单优惠 1001,2024-01-05,CU001,华东,咖啡豆A,2,58,CNY,116,0.05,已支付,重复提交 1002,1/6/2024,CU002,Hua Dong,咖啡豆A,,58,CNY,,0,已支付,缺失数量与金额 1003,2024/01/07,CU003,华北,奶茶粉B,1000,12,CNY,12000,0.1,已支付,数量异常 1004,2024-01-08,CU004,华东,巧克力C,3,$9.5,USD,28.5,0,PAID,英文状态与币种 1005,2024-01-09,CU005,华南,曲奇D,5,15,CNY,60,0,已支付,金额不匹配(应=75) 1006,2024-13-01,CU006,华中,蛋糕E,1,50,CNY,50,0,已支付,日期错误 1007,2024-01-10,CU007,华东,酸奶F,2,???,CNY,??,0,已支付,文本乱码 1008,2024-01-11,CU008,华东,生鲜组合,1,30,CNY,30,0,已支付,正常记录 1009,2024-01-11,CU009,华东,生鲜,组合,1,30,CNY,30,0,已支付,字段错位 1010,2024-01-12,CU010,Ã¥—æ‘,牛奶G,2,5,CNY,10,0,已支付,编码问题 1011,2024-01-12,CU011,华東,面包H,3,7,CNY,21,0,已支付,地区拼写变体 """ with open(path, "w", encoding="utf-8") as f: f.write(sample)

def read_csv(input_path:str)->pd.DataFrame: df = pd.read_csv(input_path, dtype=str, keep_default_na=False, na_values=["", "NA", "N/A", "null", "Null", "NULL"]) # 去除列名空格与隐藏字符 df.columns = [c.strip() for c in df.columns] # 校验列 missing_cols = [c for c in EXPECTED_COLUMNS if c not in df.columns] if missing_cols: raise ValueError(f"输入缺少必要列: {missing_cols}") # 仅保留所需列的顺序 df = df[EXPECTED_COLUMNS].copy() return df

def safe_strip(x): if isinstance(x, str): return x.strip() return x

def maybe_fix_encoding(s: str) -> str: # 尝试修复常见mojibake:latin1误解码的UTF-8 if not isinstance(s, str) or s == "": return s try: repaired = s.encode("latin1").decode("utf-8") # 简单启发:若中文字符数量增加则采用 def zh_count(txt): return sum(1 for ch in txt if '\u4e00' <= ch <= '\u9fff') if zh_count(repaired) > zh_count(s): return repaired except Exception: pass return s

def normalize_region(s: str) -> Tuple[str,bool]: if not isinstance(s, str): return s, False original = s s1 = maybe_fix_encoding(s).strip() key = s1.replace(" ", "_").lower() normalized = REGION_MAP.get(key, REGION_MAP.get(s1, None)) changed = False if normalized: changed = (normalized != original) return normalized, changed # 若包含已知中文区域词根则保留原值 for cand in ["华东","华南","华北","华中","西南","西北","东北"]: if cand in s1: return cand, (cand != original) # 无法识别则保留原值 return s1, (s1 != original)

def normalize_status(s: str) -> Tuple[str,bool]: if not isinstance(s, str): return s, False original = s key = s.strip().lower() normalized = STATUS_MAP.get(key, STATUS_MAP.get(s.strip(), s.strip())) return normalized, (normalized != original)

def parse_discount(x) -> Tuple[float, bool]: """ 返回 (discount_in_0_1, changed?) """ if x is None or (isinstance(x, float) and np.isnan(x)) or x == "": return (0.0, True) s = str(x).strip() original = s changed = False if s.endswith("%"): try: v = float(s[:-1]) return (round(v/100.0, 4), True) except: return (0.0, True) # 纯数值 try: v = float(s) # 若>1,按百分比处理 if v > 1.0: return (round(v/100.0, 4), True) return (round(v, 4), False) except: return (0.0, True)

def clean_number(s: Any) -> Tuple[float, bool]: """ 清理数值(去符号、去逗号、去未知值),返回(float或NaN, changed) """ if s is None: return (np.nan, False) if isinstance(s, (int, float)) and not isinstance(s, bool): return (float(s), False) txt = str(s).strip() original = txt # 处理未知/乱码 if txt in ["???","??","--","-","N/A","NA",""]: return (np.nan, True) # 去货币符号和逗号 txt = txt.replace(",", "").replace("¥","").replace("¥","").replace("$","") changed = (txt != original) try: return (float(txt), changed) except: return (np.nan, True)

def is_number(s: Any) -> bool: try: float(str(s)) return True except: return False

def fix_field_misalignment(row: pd.Series) -> Tuple[pd.Series, bool]: """ 针对 qty 非数值而后续字段呈现右移的情况进行修正。 规则: - 若 qty 非数值 且 unit_price 为数值 且 currency 非标准币种(或为数值), 则认为 item 与 qty 误拆分,执行左移一位并合并 item。 """ changed = False qty = row["qty"] unit_price = row["unit_price"] currency = row["currency"]

qty_is_num = is_number(qty)
unit_price_is_num = is_number(unit_price)

# 若currency看起来不像币种(例如为数字),或长度>4非字母,视为错位信号
currency_suspect = (is_number(currency) or (isinstance(currency, str) and (len(currency) > 4 and not currency.isalpha())))

if (not qty_is_num) and unit_price_is_num and currency_suspect:
    # 左移一位:item = item + qty; 其余字段整体左移
    order = ["item","qty","unit_price","currency","amount","discount","status","note"]
    vals = [row[k] for k in order]
    # 合并 item 与 qty
    new_item = str(vals[0]) + str(vals[1])
    shifted = [new_item] + vals[2:] + ["字段错位-自动修复"]  # 左移并填补note
    for k, v in zip(order, shifted):
        row[k] = v
    changed = True
return row, changed

def parse_date_str(s: Any) -> Tuple[str, bool, str]: """ 解析日期,统一为 YYYY-MM-DD。 返回: (标准化日期或空, 是否更改, 问题说明) 策略: - 优先尝试格式:%Y-%m-%d, %Y/%m/%d, %m/%d/%Y - 若失败,尝试 %Y-%d-%m(处理'2024-13-01' -> 2024-01-13) - 对歧义 '1/6/2024' 按业务上下文(样本多为1月)假定为MM/DD/YYYY => 2024-01-06 """ if not isinstance(s, str) or s.strip() == "": return "", False, "empty" original = s s = s.strip() fmts = ["%Y-%m-%d","%Y/%m/%d","%m/%d/%Y"] for f in fmts: try: d = dt.datetime.strptime(s, f).date() return d.isoformat(), (s != d.isoformat()), "" except: pass # 尝试 %Y-%d-%m try: d = dt.datetime.strptime(s, "%Y-%d-%m").date() # 记录更正 return d.isoformat(), True, "swapped_day_month" except: pass return "", False, "invalid"

def currency_normalize(row: pd.Series, base_ccy: str, rates: Dict[str,float]) -> Tuple[pd.Series, bool]: """ 将不同币种换算为基础币种,保留原值 original_* 字段,确保可追溯。 """ changed = False # 原始值备份(若无则写入) if "original_currency" not in row or pd.isna(row.get("original_currency", np.nan)): row["original_currency"] = row["currency"] if "original_unit_price" not in row or pd.isna(row.get("original_unit_price", np.nan)): row["original_unit_price"] = row["unit_price"] if "original_amount" not in row or pd.isna(row.get("original_amount", np.nan)): row["original_amount"] = row["amount"]

ccy = str(row["currency"]).strip() if isinstance(row["currency"], str) else row["currency"]
ccy_upper = str(ccy).upper()

# 同义词归一
if ccy_upper in ["RMB", "CNY", "¥", "¥"]:
    ccy_upper = "CNY"
if ccy_upper not in rates:
    # 未知币种,保持不变,仅记录
    return row, changed

if ccy_upper != base_ccy:
    rate = rates[ccy_upper]
    if is_number(row["unit_price"]):
        row["unit_price"] = round(float(row["unit_price"]) * rate, 4)
        changed = True
    if is_number(row["amount"]):
        row["amount"] = round(float(row["amount"]) * rate, 4)
        changed = True
    row["currency"] = base_ccy
    if changed:
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "currency_converted")
return row, changed

def append_flag(flags: str, new_flag: str) -> str: s = (flags or "").strip() if s == "": return new_flag parts = set([p.strip() for p in s.split(",") if p.strip()]) parts.add(new_flag) return ",".join(sorted(parts))

def compute_expected_amount(qty, unit_price, discount): if any([pd.isna(qty), pd.isna(unit_price), pd.isna(discount)]): return np.nan try: return round(float(qty) * float(unit_price) * (1.0 - float(discount)), 2) except: return np.nan

def pre_quality_scan(df: pd.DataFrame) -> Dict[str, Any]: m = {} m["total_rows_raw"] = len(df) m["duplicate_order_id_rows"] = int((df.duplicated(subset=["order_id"])).sum()) m["missing_qty_rows"] = int((df["qty"].astype(str).replace({"": np.nan}).isna() | ~df["qty"].astype(str).apply(is_number)).sum()) m["missing_unit_price_rows"] = int((df["unit_price"].astype(str).replace({"": np.nan}).isna() | ~df["unit_price"].astype(str).apply(lambda x: is_number(str(x).replace("$","").replace(",","")))).sum()) m["missing_amount_rows"] = int((df["amount"].astype(str).replace({"": np.nan}).isna() | ~df["amount"].astype(str).apply(lambda x: is_number(str(x).replace("$","").replace(",","")))).sum()) # 非标准日期(尝试解析失败计数) invalid_dates = 0 for s in df["order_date"].astype(str).tolist(): std, chg, note = parse_date_str(s) if note == "invalid": invalid_dates += 1 m["invalid_date_rows"] = invalid_dates # 非标准地区 nonstd_region = 0 for s in df["region"].astype(str).tolist(): norm, changed = normalize_region(s) if changed: nonstd_region += 1 m["region_normalization_needed_rows"] = nonstd_region # 非标准状态 nonstd_status = 0 for s in df["status"].astype(str).tolist(): norm, changed = normalize_status(s) if changed: nonstd_status += 1 m["status_normalization_needed_rows"] = nonstd_status # 可能错位 misaligned = 0 for _, row in df.iterrows(): _, changed = fix_field_misalignment(row.copy()) if changed: misaligned += 1 m["field_misalignment_rows"] = misaligned # 非基础币种 non_base_ccy = int((~df["currency"].str.upper().isin(["CNY","RMB"])).sum()) m["non_base_currency_rows"] = non_base_ccy return m

def post_quality_scan(df: pd.DataFrame) -> Dict[str, Any]: m = {} m["total_rows_cleaned"] = len(df) m["duplicate_order_id_rows"] = int((df.duplicated(subset=["order_id"])).sum()) m["missing_qty_rows"] = int(df["qty"].isna().sum()) m["missing_unit_price_rows"] = int(df["unit_price"].isna().sum()) m["missing_amount_rows"] = int(df["amount"].isna().sum()) m["invalid_date_rows"] = int(df["order_date"].isna().sum()) m["non_base_currency_rows"] = int((df["currency"].str.upper() != "CNY").sum()) # 统计标记 for flag in ["currency_converted","amount_corrected","field_misalignment_fixed","encoding_fixed","region_normalized","status_normalized","date_corrected","duplicate_merged","outlier_qty","outlier_unit_price"]: m[f"flag_{flag}"] = int(df["issue_flags"].fillna("").str.contains(flag).sum()) return m

def merge_duplicates(df: pd.DataFrame, log: logging.Logger) -> Tuple[pd.DataFrame, int]: """ 基于 order_id 合并重复: - 数值字段优先非空且校验通过 - 文本字段合并备注 - 保留 earliest order_date(或修正后的标准日期)并统一 """ if df["order_id"].duplicated().sum() == 0: return df, 0

cols = df.columns.tolist()
groups = []
removed = 0

for oid, g in df.groupby("order_id", sort=False):
    if len(g) == 1:
        groups.append(g.iloc[0])
        continue
    removed += len(g) - 1
    # 选取首条为基准
    base = g.iloc[0].copy()
    # 合并 note
    all_notes = [str(x) for x in g["note"].fillna("").tolist() if str(x).strip()!=""]
    base["note"] = ";".join(dict.fromkeys(all_notes)) if all_notes else base.get("note","")
    # 状态取规范化后一致值,若不一致优先“已支付”
    statuses = g["status"].fillna("").tolist()
    if "已支付" in statuses:
        base["status"] = "已支付"
    else:
        base["status"] = statuses[0] if statuses else base["status"]
    # 对数值字段:qty, unit_price, amount, discount 选非空优先,并在后续统一校验
    for c in ["qty","unit_price","amount","discount"]:
        vals = [v for v in g[c].tolist() if not pd.isna(v)]
        if len(vals)>0:
            # 若存在多个不同值,记录冲突
            if len(set([str(v) for v in vals]))>1:
                base["issue_flags"] = append_flag(base.get("issue_flags",""), f"duplicate_conflict_{c}")
            base[c] = vals[0]
    # 其他文本字段以首条为主
    base["issue_flags"] = append_flag(base.get("issue_flags",""), "duplicate_merged")
    groups.append(base)

df2 = pd.DataFrame(groups, columns=cols)
return df2, removed

def detect_outliers_iqr(series: pd.Series) -> pd.Series: """ IQR法标记异常:低于 Q1-1.5IQR 或 高于 Q3+1.5IQR 返回布尔序列 """ s = series.dropna().astype(float) if len(s) < 4: return pd.Series([False]len(series), index=series.index) q1, q3 = s.quantile(0.25), s.quantile(0.75) iqr = q3 - q1 low, high = q1 - 1.5iqr, q3 + 1.5*iqr out = (series.astype(float) < low) | (series.astype(float) > high) out = out.fillna(False) return out

def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", type=str, default="input_orders.csv", help="输入CSV路径") parser.add_argument("--output-dir", type=str, default="output", help="输出目录") parser.add_argument("--base-currency", type=str, default=DEFAULT_BASE_CCY, help="基础币种(默认CNY)") parser.add_argument("--rate-USD", type=float, default=DEFAULT_EXCHANGE_RATES["USD"], help="USD兑基础币种的汇率") args = parser.parse_args()

os.makedirs(args.output_dir, exist_ok=True)
log_path = setup_logging(args.output_dir)
write_sample_if_missing(args.input)

# 读取
df_raw = read_csv(args.input)
# 保存原始快照
raw_snapshot = os.path.join(args.output_dir, "raw_snapshot.csv")
df_raw.to_csv(raw_snapshot, index=False, encoding="utf-8-sig")
logging.info("原始数据快照保存: %s", raw_snapshot)

# 预评估
pre_metrics = pre_quality_scan(df_raw)
logging.info("清洗前质量评估: %s", json.dumps(pre_metrics, ensure_ascii=False))

df = df_raw.copy()

# 基础清理:去空格
for c in df.columns:
    df[c] = df[c].apply(safe_strip)

# 字段错位修复
misaligned_fixed = 0
new_rows = []
for _, row in df.iterrows():
    row, changed = fix_field_misalignment(row)
    if changed:
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "field_misalignment_fixed")
        misaligned_fixed += 1
    new_rows.append(row)
df = pd.DataFrame(new_rows, columns=df.columns)

# 文本规范化与编码修复
enc_fixed = 0
region_normed = 0
status_normed = 0
std_dates = 0
invalid_dates = 0

std_dates_list = []
region_list = []
status_list = []

for i, row in df.iterrows():
    # 日期
    std, changed, note = parse_date_str(row["order_date"])
    if std != "":
        if changed or (row["order_date"] != std):
            row["issue_flags"] = append_flag(row.get("issue_flags",""), "date_corrected")
            std_dates += 1
        row["order_date"] = std
    else:
        if note == "invalid":
            invalid_dates += 1
            row["issue_flags"] = append_flag(row.get("issue_flags",""), "date_invalid")
            row["order_date"] = np.nan
        else:
            row["order_date"] = np.nan

    # 地区
    before_region = row["region"]
    rnorm, rchg = normalize_region(before_region)
    if rchg:
        region_normed += 1
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "region_normalized")
    if maybe_fix_encoding(before_region) != before_region:
        enc_fixed += 1
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "encoding_fixed")
    row["region"] = rnorm

    # 状态
    snorm, schg = normalize_status(row["status"])
    if schg:
        status_normed += 1
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "status_normalized")
    row["status"] = snorm

    std_dates_list.append(row["order_date"])
    region_list.append(row["region"])
    status_list.append(row["status"])
    df.loc[i] = row

# 数值标准化:qty, unit_price, amount, discount
qty_list = []
up_list = []
amt_list = []
disc_list = []

for i, row in df.iterrows():
    # qty
    q_raw = row["qty"]
    qv_changed = False
    if q_raw is None or str(q_raw).strip()=="":
        q = np.nan
        qv_changed = True
    else:
        # 去掉非数字
        if is_number(q_raw):
            q = float(q_raw)
        else:
            q = np.nan
            qv_changed = True
    # unit_price
    up, chg_up = clean_number(row["unit_price"])
    # amount
    amt, chg_amt = clean_number(row["amount"])
    # discount
    disc, chg_disc = parse_discount(row["discount"])

    if qv_changed or chg_up or chg_amt or chg_disc:
        row["issue_flags"] = append_flag(row.get("issue_flags",""), "numeric_cleaned")

    qty_list.append(q if (pd.isna(q) or q.is_integer()) else q)  # 可能是浮点,但后续转int
    up_list.append(up)
    amt_list.append(amt)
    disc_list.append(disc)
    df.loc[i] = row

df["qty"] = qty_list
df["unit_price"] = up_list
df["amount"] = amt_list
df["discount"] = disc_list

# 货币转换到基础币种
rates = dict(DEFAULT_EXCHANGE_RATES)
if args.base_currency.upper() not in rates:
    rates[args.base_currency.upper()] = 1.0
# 覆盖USD汇率
rates["USD"] = float(args.rate_USD) if hasattr(args, "rate_USD") else DEFAULT_EXCHANGE_RATES["USD"]

conv_changed = 0
for i, row in df.iterrows():
    row, changed = currency_normalize(row, args.base_currency.upper(), rates)
    if changed:
        conv_changed += 1
    df.loc[i] = row

# 金额核对与修正:amount = qty * unit_price * (1 - discount)
amount_corrected = 0
for i, row in df.iterrows():
    exp_amt = compute_expected_amount(row["qty"], row["unit_price"], row["discount"])
    if not pd.isna(exp_amt):
        if pd.isna(row["amount"]) or (abs(float(row["amount"]) - exp_amt) > 0.01):
            df.at[i, "amount"] = exp_amt
            df.at[i, "issue_flags"] = append_flag(row.get("issue_flags",""), "amount_corrected")
            amount_corrected += 1

# 数据类型微调
# qty尽可能为整数
def to_int_or_nan(v):
    if pd.isna(v):
        return np.nan
    try:
        if float(v).is_integer():
            return int(float(v))
        return int(round(float(v)))
    except:
        return np.nan

df["qty"] = df["qty"].apply(to_int_or_nan)
# 单价保留两位
df["unit_price"] = df["unit_price"].apply(lambda x: round(float(x), 4) if (not pd.isna(x)) else x)
df["amount"] = df["amount"].apply(lambda x: round(float(x), 2) if (not pd.isna(x)) else x)

# 合并重复订单
df, removed_dup = merge_duplicates(df, logging)
logging.info("重复记录合并完成,移除行数: %s", removed_dup)

# 分组异常检测(按 item 分组)
df["outlier_qty"] = False
df["outlier_unit_price"] = False
for item, g in df.groupby("item"):
    idx = g.index
    if g["qty"].notna().sum() >= 4:
        out_q = detect_outliers_iqr(g["qty"].astype(float))
        df.loc[idx, "outlier_qty"] = out_q
    if g["unit_price"].notna().sum() >= 4:
        out_p = detect_outliers_iqr(g["unit_price"].astype(float))
        df.loc[idx, "outlier_unit_price"] = out_p

# 将异常标记写入 issue_flags,不进行值修改以保护业务信息
df.loc[df["outlier_qty"]==True, "issue_flags"] = df.loc[df["outlier_qty"]==True, "issue_flags"].apply(lambda x: append_flag(x, "outlier_qty"))
df.loc[df["outlier_unit_price"]==True, "issue_flags"] = df.loc[df["outlier_unit_price"]==True, "issue_flags"].apply(lambda x: append_flag(x, "outlier_unit_price"))

# 事后质量评估
post_metrics = post_quality_scan(df)

# 输出文件
cleaned_path = os.path.join(args.output_dir, "cleaned_orders.csv")
report_path = os.path.join(args.output_dir, "quality_report.json")

# 输出顺序与追溯字段
output_cols = [
    "order_id","order_date","customer_id","region","item",
    "qty","unit_price","currency","amount","discount",
    "status","note",
    "original_currency","original_unit_price","original_amount",
    "issue_flags","outlier_qty","outlier_unit_price"
]
for c in output_cols:
    if c not in df.columns:
        df[c] = np.nan

df = df[output_cols]
df.to_csv(cleaned_path, index=False, encoding="utf-8-sig")

full_report = {
    "run_time": dt.datetime.now().isoformat(),
    "base_currency": args.base_currency.upper(),
    "exchange_rates_used": rates,
    "paths": {
        "log": log_path,
        "raw_snapshot": raw_snapshot,
        "cleaned_csv": cleaned_path
    },
    "metrics_before": pre_metrics,
    "metrics_after": post_metrics,
    "notes": {
        "field_misalignment_fixed": "检测到qty非数值且后续字段错位时,将item与qty合并并整体左移一位",
        "date_correction": "按多格式解析并在必要时采用YYYY-DD-MM作为兜底修正(示例: 2024-13-01 -> 2024-01-13)",
        "currency_conversion": "将USD等换算为基础币种CNY,保留original_*追溯;汇率来自脚本参数",
        "amount_validation": "使用 amount = qty * unit_price * (1 - discount) 校验并修正",
        "duplicates_merge": "按order_id合并重复并合并备注,保留非空且一致性更高的值",
        "outlier_detection": "按item分组使用IQR法标记极端值但不自动修改"
    }
}
with open(report_path, "w", encoding="utf-8") as f:
    json.dump(full_report, f, ensure_ascii=False, indent=2)

logging.info("清洗完成。清洗文件: %s, 质量报告: %s", cleaned_path, report_path)
print("\n=== 输出路径 ===")
print(f"清洗后CSV: {cleaned_path}")
print(f"质量报告JSON: {report_path}")
print(f"处理日志: {log_path}")

if name == "main": main()

清洗效果评估

  • 输出清洗后CSV数据路径

    • 清洗后CSV: ./output/cleaned_orders.csv
    • 质量报告JSON: ./output/quality_report.json
    • 处理日志: ./output/cleaning_log.txt
  • 关键指标对比(基于所给样例数据的预期结果,实际以脚本输出为准)

    • 记录数: 清洗前 12 行 -> 清洗后 11 行(重复订单ID=1001合并1行)
    • 缺失/异常字段(行数)
      • qty: 前 2 行(1002缺失;1009错位导致非数值)-> 后 1 行(仅1002仍缺失)
      • unit_price: 前 1 行(1007为“???”)-> 后 1 行(仍无法恢复)
      • amount: 前 2 行(1002缺失、1007为“??”) -> 后 2 行(1002、1007仍缺失或不可推导)
    • 日期
      • 无法解析/错误: 前 1 行(2024-13-01)-> 后 0 行(修正为2024-01-13)
    • 地区/状态规范化
      • 地区需规范化: 前约 3 行(Hua Dong、华東、编码异常)-> 后 0 行
      • 状态需规范化: 前 1 行(PAID)-> 后 0 行
    • 币种与金额
      • 非基础币种: 前 1 行(USD)-> 后 0 行(已换算为CNY,保留original_*追溯)
      • 金额与公式不一致修正: 1 行(1005从60修正为75)
    • 字段错位修复
      • 1 行(1009 生鲜/组合)-> 已修复并标记 field_misalignment_fixed
    • 异常值标记(分组IQR)
      • 样本量受限,可能仅标记极端分布时的qty或单价为outlier,不自动更改
  • 业务价值评估

    • 可用数据提升:去重与错位修复后,订单明细结构化准确性显著提升;日期、地区、状态统一后便于跨表关联与报表分组统计。
    • 金额可信度提升:金额统一按公式校验并修正,消除手工录入偏差,保障营收指标正确。
    • 跨币种一致性:自动换算为基础币种,支撑统一的收入分析和毛利计算;同时保留original_*字段,确保审计可追溯。
    • 风险控制:对异常与潜在质量问题(如编码、错位、缺失)进行标记,便于后续人工复核或回溯。

优化与持续监控建议

  • 数据入口治理
    • 上游系统强校验:在录入层面强制字段类型(qty为整数、单价/金额为数值、日期为YYYY-MM-DD)。
    • 下拉字典统一:地区、状态、币种采用受控词表,避免自由文本(例如“Hua Dong”“华東”等变体)。
    • 文件编码统一为UTF-8,并在ETL入口做编码探测与强制转码。
  • 金额与汇率
    • 汇率来源改为权威API或主数据系统,并按生效日期保留汇率版本与换算时间戳。
    • 价格/金额校验前置到订单创建流程,异常即时阻断或提示。
  • 异常监控
    • 基于历史分布的自适应阈值,对qty、单价、折扣的异常进行持续监控并告警。
    • 按item/region维度输出日常质量看板(缺失率、异常率、修正率、重复率)。
  • 追溯与审计
    • 保留original_*与issue_flags字段,坚持“不丢信息、可回溯”原则。
    • 对“自动修正”类规则(金额修正、错位修复、日期纠偏)建立变更审计表与抽样复核流程。
  • 性能与自动化
    • 调度:将本脚本纳入定时任务(如Airflow/Cron),对新到文件自动执行。
    • 测试:增加单元测试与样例快照测试,防止规则变更导致回归问题。

说明

  • 脚本默认基础币种为CNY,USD汇率默认7.0,请在运行时按需通过参数覆盖,汇率取值应由业务或财务确认。
  • 所有修正均通过issue_flags标记,并保留original_*字段,确保透明、可追溯。
  • 对无法可靠推导的数据(如“???”单价、金额双缺失)不盲目填充,避免损害数据完整性。

Python自动化清洗脚本 以下脚本可直接运行:会先尝试从 ./data/input.csv 读取数据;若不存在,会自动用你提供的样例内容生成该文件。脚本将完成数据质量评估、字段错位修复、缺失值与格式统一、异常值标记、去重、输出清洗结果与质量报告,并生成处理日志,保证可追溯。

请将以下代码保存为 clean_events.py 后执行:python clean_events.py

#!/usr/bin/env python3

-- coding: utf-8 --

""" 事件数据自动清洗脚本

  • 解析CSV并修复字段错位
  • 质量评估:缺失、重复、异常、格式不一致
  • 清洗:文本清理、格式统一、缺失推断、异常标记、轻度去重
  • 输出:清洗CSV、重复数据CSV、质量报告JSON、处理日志 符合用户偏好:文本清理、格式统一、缺失推断、标记、删除(仅安全去重) """

import os import re import json import math import logging import unicodedata from pathlib import Path from datetime import datetime import pandas as pd import numpy as np import calendar

--------------------------

配置

--------------------------

BASE_DIR = Path(".").resolve() DATA_DIR = BASE_DIR / "data" OUT_DIR = BASE_DIR / "output" DATA_DIR.mkdir(parents=True, exist_ok=True) OUT_DIR.mkdir(parents=True, exist_ok=True)

INPUT_CSV = DATA_DIR / "input.csv" CLEANED_CSV = OUT_DIR / "cleaned_events.csv" DUPLICATES_CSV = OUT_DIR / "duplicates.csv" ANOMALIES_CSV = OUT_DIR / "anomalies_flagged.csv" QUALITY_REPORT_JSON = OUT_DIR / "quality_report.json" LOG_FILE = OUT_DIR / "cleaning.log"

--------------------------

日志

--------------------------

logging.basicConfig( filename=str(LOG_FILE), filemode="w", level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(name)

--------------------------

若无输入文件,写入样例

--------------------------

SAMPLE_CSV = """event_id,event_time,user_id,action,channel,device,city,duration_ms,spend,spend_unit,remark e1,2024-02-01 10:01:02,U100,click,app,Android,深圳,450,0,CNY,正常 e1,2024-02-01 10:01:02,U100,click,app,Android,深圳,450,0,CNY,重复事件 e2,02/01/2024 10:02:03,U101,Click,App,iOS,深圳,abc,0,CNY,时长非数字 e3,2024/02/01 10:03:04,U102,regsitser,web,Web,上海,2300,0,CNY,拼写错误 e4,2024-02-30 09:00:00,U103,login,web,Web,广州,500,0,CNY,日期错误 e5,2024-02-01T10:05:06+08:00,U104,购买,app,Android,深圳,500,2.5,yuan,单位不统一 e6,2024-02-01 10:06:07,U105,purchase,app,Android,深圳,南山区,600,3,CNY,字段错位 e7,2024-02-01 10:07:08,U106,点击??,app,Android,厦门,1200000,0,CNY,异常值与乱码 e8,2024-02-01 10:08:09,U107,購買,app,Android,深圳,650,1,CNY,编码不规范 e9,2024-02-01 10:09:10,U108,logout,app,Android,ShenZhen,50,0,CNY,城市英文混入
""" if not INPUT_CSV.exists(): INPUT_CSV.write_text(SAMPLE_CSV, encoding="utf-8") logger.info(f"Wrote sample CSV to {INPUT_CSV}")

--------------------------

工具函数

--------------------------

def normalize_text(s: str) -> str: """Unicode标准化 + 去除不可见字符 + 去掉两端空白""" if pd.isna(s): return s s = str(s) s = unicodedata.normalize("NFKC", s) # 替换常见乱码占位 s = s.replace("", "") # 压缩重复问号等 s = re.sub(r"?{2,}", "?", s) # 去除控制字符 s = "".join(ch for ch in s if unicodedata.category(ch)[0] != "C") return s.strip()

def looks_like_subcity(token: str) -> bool: """判断是否像城市子级(区/县/市等),用于字段错位修复""" if not token: return False t = token.strip() if re.search(r"[0-9]", t): return False # 包含常见中文行政区后缀或短词 if re.search(r"(区|县|市|镇|乡)$", t) and 1 <= len(t) <= 6: return True # 英文且较短,可能是城市拼写碎片 if re.match(r"^[A-Za-z-]{2,10}$", t): return True return False

def fix_row_tokens(header_cols, tokens): """修复字段错位:将city后多出的行政区碎片合并到city;其余多余字段并入remark末尾""" ncols = len(header_cols) city_idx = header_cols.index("city") remark_idx = header_cols.index("remark") notes = [] repaired = False

# 先合并city后面的一个或多个碎片
while len(tokens) > ncols and looks_like_subcity(tokens[city_idx + 1]):
    tokens[city_idx] = f"{tokens[city_idx].strip()}-{tokens[city_idx + 1].strip()}"
    del tokens[city_idx + 1]
    repaired = True
    notes.append("fix_city_merge_subcity")

# 仍多出的部分:并入remark(用;拼接),保持长度一致
if len(tokens) > ncols:
    overflow = tokens[ncols - 1:]  # remark及其后的所有
    base_remark = tokens[remark_idx] if remark_idx < len(tokens) else ""
    merged = ";".join([base_remark] + overflow[1:]).strip(";")
    tokens = tokens[:remark_idx] + [merged]
    repaired = True
    notes.append("fix_remark_merge_overflow")

# 不足则补空
if len(tokens) < ncols:
    tokens = tokens + [""] * (ncols - len(tokens))
    repaired = True
    notes.append("fix_pad_empty")

return tokens, repaired, notes

def parse_event_time_safe(s: str): """稳健解析event_time;若无效日期(如2月30日),回退为该月最后一天;保留本地时区时间(去tz)""" s0 = s s = normalize_text(s) ts = pd.to_datetime(s, errors="coerce") fixed = False reason = None

if pd.isna(ts):
    # 检测YYYY-MM-DD HH:MM:SS形式的非法日
    m = re.match(r"^\s*(\d{4})[-/](\d{2})[-/](\d{2})[ T](\d{2}):(\d{2}):(\d{2})\s*$", s)
    if m:
        y, mo, d, hh, mm, ss = map(int, m.groups())
        # 若日期非法,调整为当月最后一天
        last_day = calendar.monthrange(y, mo)[1]
        if d > last_day:
            d = last_day
            fixed = True
            reason = "fixed_invalid_day_to_month_end"
        try:
            ts = pd.Timestamp(year=y, month=mo, day=d, hour=hh, minute=mm, second=ss)
        except Exception:
            ts = pd.NaT

if isinstance(ts, pd.Timestamp) and ts.tz is not None:
    # 转为Asia/Shanghai并去掉tz
    try:
        ts = ts.tz_convert("Asia/Shanghai").tz_localize(None)
        fixed = True
        if not reason:
            reason = "tz_converted_to_Asia_Shanghai"
    except Exception:
        # 有些tz是naive;忽略
        ts = ts.tz_localize(None)
        fixed = True
        if not reason:
            reason = "tz_removed"

return ts, fixed, reason

def standardize_action(raw: str): """动作标准化到集合:click/register/login/purchase/logout,返回标准值与是否修正""" s = normalize_text(raw).lower() # 去除非字母汉字 s2 = re.sub(r"[^a-z\u4e00-\u9fa5]", "", s)

mapping = {
    "click": "click",
    "注册": "register",
    "注册用户": "register",
    "register": "register",
    "登录": "login",
    "login": "login",
    "登出": "logout",
    "logout": "logout",
    "购买": "purchase",
    "購買": "purchase",
    "买": "purchase",
    "purchase": "purchase",
    "点击": "click",
}

# 精确或近似匹配
candidates = [s2, s]
for c in candidates:
    if c in mapping:
        val = mapping[c]
        return val, (val != raw)

# 常见拼写错误修复
if s2 in {"regsitser", "regsiter", "regist", "regster"}:
    return "register", True
if s2 in {"logon"}:
    return "login", True

# 模糊:中文里含“购”“买”
if re.search(r"[购買买]", s2):
    return "purchase", True
if "点" in s2 or "击" in s2:
    return "click", True

# 未知,保留原文但标记
return raw, False

def standardize_channel(raw: str): s = normalize_text(raw).lower() if s in {"app", "web"}: return s, (s != raw) return s, (s != raw)

def standardize_device(raw: str): s = normalize_text(raw).lower() mapping = {"android": "Android", "ios": "iOS", "web": "Web"} if s in mapping: return mapping[s], (mapping[s] != raw) # 保底:首字母大写 std = s.capitalize() return std, (std != raw)

def standardize_city(raw: str): s = normalize_text(raw) # 统一英文/大小写到中文名 m = s.lower() mapping = { "shenzhen": "深圳", "shenzhen": "深圳", "shen zhen": "深圳", } if m in mapping: return mapping[m], (mapping[m] != raw) # 若包含“-子级”,保留 return s, (s != raw)

def to_float_safe(x): x = normalize_text(x) if x == "" or x is None: return np.nan try: return float(x) except: # 抽取数字 m = re.search(r"-?\d+(.\d+)?", x) return float(m.group()) if m else np.nan

def to_int_safe(x): f = to_float_safe(x) if pd.isna(f): return np.nan try: return int(round(f)) except: return np.nan

def iqr_outlier_flags(series: pd.Series): """返回是否高/低异常(IQR方法),不足样本时退化为硬阈值""" s = pd.to_numeric(series, errors="coerce") valid = s.dropna() high = pd.Series([False]*len(series), index=series.index) low = pd.Series([False]*len(series), index=series.index)

if len(valid) >= 5:
    q1, q3 = valid.quantile(0.25), valid.quantile(0.75)
    iqr = q3 - q1
    upper = q3 + 1.5 * iqr
    lower = q1 - 1.5 * iqr
    high = s > upper
    low = s < lower
else:
    # 退化阈值:负值或超过30分钟
    high = s > 30 * 60 * 1000
    low = s < 0
return low.fillna(False), high.fillna(False)

--------------------------

读入与字段错位修复

--------------------------

with INPUT_CSV.open("r", encoding="utf-8") as f: lines = [line.rstrip("\n") for line in f]

if not lines: raise RuntimeError("输入CSV为空")

header = [h.strip() for h in lines[0].split(",")] raw_rows = [] fix_notes = [] for i, line in enumerate(lines[1:], start=2): # 从第2行起(人类计数) tokens = [t.strip() for t in line.split(",")] tokens, repaired, notes = fix_row_tokens(header, tokens) raw_rows.append(tokens) fix_notes.append({"line_number": i, "repaired": repaired, "notes": notes})

df = pd.DataFrame(raw_rows, columns=header)

保留一份原始副本(修复错位后的原始值)

df_orig = df.copy(deep=True)

添加可追溯列

df["_clean_notes"] = ""

def add_note(idx, note): df.at[idx, "_clean_notes"] = (df.at[idx, "_clean_notes"] + ";" + note).strip(";")

--------------------------

初步质量评估(修复错位后)

--------------------------

rows_before = len(df) dup_by_eventid_before = int(df.duplicated(subset=["event_id"]).sum()) dup_full_before = int(df.duplicated().sum()) missing_before = df.replace({"": np.nan}).isna().sum().to_dict()

logger.info(f"Rows before: {rows_before}, dup(event_id): {dup_by_eventid_before}, dup(full): {dup_full_before}") logger.info(f"Missing before (empty as NaN): {missing_before}")

--------------------------

文本清理与标准化

--------------------------

统一去空白/标准化可见字符

for col in df.columns: df[col] = df[col].apply(lambda x: normalize_text(x))

时间解析与修复

df["event_time_parsed"] = pd.NaT df["event_time_fixed"] = False df["event_time_fix_reason"] = "" for idx, val in df["event_time"].items(): ts, fixed, reason = parse_event_time_safe(val) df.at[idx, "event_time_parsed"] = ts df.at[idx, "event_time_fixed"] = bool(fixed) df.at[idx, "event_time_fix_reason"] = reason if reason else "" if fixed: add_note(idx, f"time_fixed:{reason}") df["event_time"] = df["event_time_parsed"] df.drop(columns=["event_time_parsed"], inplace=True)

动作标准化

df["action_std_changed"] = False for idx, val in df["action"].items(): std, changed = standardize_action(val) df.at[idx, "action"] = std df.at[idx, "action_std_changed"] = bool(changed) if changed: add_note(idx, f"action_std:{val}->{std}")

渠道/设备标准化

df["channel_std_changed"] = False df["device_std_changed"] = False for idx in df.index: ch, chg = standardize_channel(df.at[idx, "channel"]) df.at[idx, "channel"] = ch df.at[idx, "channel_std_changed"] = bool(chg) dv, dchg = standardize_device(df.at[idx, "device"]) df.at[idx, "device"] = dv df.at[idx, "device_std_changed"] = bool(dchg) if chg: add_note(idx, "channel_std") if dchg: add_note(idx, "device_std")

城市标准化

df["city_std_changed"] = False for idx, val in df["city"].items(): std, changed = standardize_city(val) df.at[idx, "city"] = std df.at[idx, "city_std_changed"] = bool(changed) if changed: add_note(idx, f"city_std:{val}->{std}")

数值化

df["duration_ms_raw"] = df["duration_ms"] df["duration_ms"] = df["duration_ms"].apply(to_int_safe)

df["spend_raw"] = df["spend"] df["spend"] = df["spend"].apply(to_float_safe)

货币单位统一

df["spend_unit_std_changed"] = False for idx, val in df["spend_unit"].items(): v = normalize_text(val).lower() std = v if v in {"cny", "rmb", "元", "yuan"}: std = "CNY" df.at[idx, "spend_unit_std_changed"] = (std != val) df.at[idx, "spend_unit"] = std if std != val: add_note(idx, f"spend_unit_std:{val}->{std}")

--------------------------

异常值检测与缺失推断

--------------------------

duration异常值标记(不篡改,只标记)

low_flag, high_flag = iqr_outlier_flags(df["duration_ms"]) df["duration_outlier_low"] = low_flag.values df["duration_outlier_high"] = high_flag.values df["duration_outlier"] = df["duration_outlier_low"] | df["duration_outlier_high"]

用稳健组内中位数推断缺失(排除异常值),分组键可用 action

df["duration_imputed"] = False if df["duration_ms"].isna().any(): # 计算组内中位数(排除异常值) robust_median = ( df.loc[~df["duration_outlier"], ["action", "duration_ms"]] .groupby("action", dropna=False)["duration_ms"] .median() ) for idx in df.index[df["duration_ms"].isna()]: g = df.at[idx, "action"] med = robust_median.get(g, np.nan) if pd.isna(med): med = df.loc[~df["duration_outlier"], "duration_ms"].median() if not pd.isna(med): df.at[idx, "duration_ms"] = int(round(med)) df.at[idx, "duration_imputed"] = True add_note(idx, f"duration_imputed:{med}")

--------------------------

质量标记:文本乱码/编码不规范

--------------------------

def is_text_garbled(s: str) -> bool: if s is None: return False s = str(s) # 含连续问号、替换符或明显混杂不可见字符 return ("??" in s) or ("" in s)

df["remark_garbled"] = df["remark"].apply(is_text_garbled) df["action_garbled"] = df["action"].apply(lambda x: is_text_garbled(x)) for idx in df.index: if df.at[idx, "remark_garbled"] or df.at[idx, "action_garbled"]: add_note(idx, "text_garbled_cleaned")

--------------------------

去重策略(保留首条):按event_id

--------------------------

df["is_duplicate_eventid"] = df.duplicated(subset=["event_id"], keep="first") dups = df.loc[df["is_duplicate_eventid"]].copy() if not dups.empty: dups.to_csv(DUPLICATES_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved duplicates to {DUPLICATES_CSV} count={len(dups)}")

df_clean = df.loc[~df["is_duplicate_eventid"]].copy() rows_after = len(df_clean)

--------------------------

输出清洗结果与标记

--------------------------

整理列顺序:业务列 + 标记列 + 追溯列

biz_cols = ["event_id", "event_time", "user_id", "action", "channel", "device", "city", "duration_ms", "spend", "spend_unit", "remark"] flag_cols = [ "event_time_fixed", "event_time_fix_reason", "action_std_changed", "channel_std_changed", "device_std_changed", "city_std_changed", "spend_unit_std_changed", "duration_outlier", "duration_outlier_low", "duration_outlier_high", "duration_imputed", "is_duplicate_eventid" ] trace_cols = ["duration_ms_raw", "spend_raw", "_clean_notes"]

保存异常行(比如异常时长/乱码等)

anomalies = df_clean.loc[df_clean["duration_outlier"] | df_clean["remark_garbled"] | df_clean["action_garbled"]].copy() if not anomalies.empty: anomalies.to_csv(ANOMALIES_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved anomalies to {ANOMALIES_CSV} count={len(anomalies)}")

df_clean[biz_cols + flag_cols + trace_cols].to_csv(CLEANED_CSV, index=False, encoding="utf-8-sig") logger.info(f"Saved cleaned CSV to {CLEANED_CSV} rows={len(df_clean)}")

--------------------------

质量报告

--------------------------

report = { "rows_before": rows_before, "rows_after": rows_after, "duplicates_by_eventid_before": int(dup_by_eventid_before), "duplicates_removed": int(dup_by_eventid_before), "full_row_duplicates_before": int(dup_full_before), "missing_before_empty_as_nan": missing_before, "datetime_fixed_count": int(df["event_time_fixed"].sum()), "duration_missing_after_impute": int(df_clean["duration_ms"].isna().sum()), "duration_imputed_count": int(df_clean["duration_imputed"].sum()), "duration_outlier_count": int(df_clean["duration_outlier"].sum()), "action_standardized_count": int(df_clean["action_std_changed"].sum()), "channel_standardized_count": int(df_clean["channel_std_changed"].sum()), "device_standardized_count": int(df_clean["device_std_changed"].sum()), "city_standardized_count": int(df_clean["city_std_changed"].sum()), "spend_unit_standardized_count": int(df_clean["spend_unit_std_changed"].sum()), "garbled_text_rows": int((df_clean["remark_garbled"] | df_clean["action_garbled"]).sum()), "outputs": { "cleaned_csv": str(CLEANED_CSV), "duplicates_csv": str(DUPLICATES_CSV), "anomalies_csv": str(ANOMALIES_CSV), "quality_report_json": str(QUALITY_REPORT_JSON), "log_file": str(LOG_FILE), }, "field_misalignment_fixes": fix_notes, } with open(QUALITY_REPORT_JSON, "w", encoding="utf-8") as f: json.dump(report, f, ensure_ascii=False, indent=2)

print(str(CLEANED_CSV)) print(str(QUALITY_REPORT_JSON)) print(str(LOG_FILE))

清洗后的CSV数据路径

  • 清洗结果: ./output/cleaned_events.csv
  • 重复记录留存: ./output/duplicates.csv
  • 异常样本留存: ./output/anomalies_flagged.csv
  • 质量报告(JSON): ./output/quality_report.json
  • 处理日志: ./output/cleaning.log

清洗效果评估 基于你提供的样例数据(9行,e1-e9),预计关键指标对比如下:

  • 行数:清洗前 9 行 -> 清洗后 8 行(按 event_id 去重移除 1 行重复)
  • 去重:按 event_id 去重移除 1 条(e1 的重复事件保存在 duplicates.csv)
  • 无效/异常时间:
    • 发现 1 条非法日期(2024-02-30),已自动修正为 2024-02-29(闰年月末),并加标记 event_time_fixed
    • 时区格式(+08:00)已归一到本地时间并去tz
  • 时长 duration_ms:
    • 非数字(abc)1 条,按偏好采用“缺失推断”:按 action 组内稳健中位数填充(click 组,排除异常值后中位数=450),并标注 duration_imputed
    • 字段错位 1 条(e6:city 后含“南山区”导致错位),已合并为 city=“深圳-南山区”,duration_ms=600,spend=3,spend_unit=CNY,并加追溯说明
    • 异常值标记 1 条(1200000ms>30分钟),仅标记 duration_outlier,不做篡改
  • 标准化与编码:
    • action 标准化约 5 条(Click/购买/購買/点击??/regsitser -> click/purchase/register 等)
    • channel 统一小写约 1 条(App->app)
    • device 统一规范大小写(Web/iOS/Android 保持规范)
    • city 统一约 2 条(ShenZhen->深圳;“深圳-南山区”修复合并)
    • 货币单位统一 1 条(yuan->CNY;汇率未涉及,1:1 直归一,符合中国区场景)
    • 文本乱码/不规范标记 2 条(如“点击??”、“購買”),已清理并标记
  • 缺失值:清洗前可解析缺失(duration_ms)2 起(含错位导致的非数),清洗后 0 起(1 修复错位,1 推断填充)
  • 可追溯性:每行 _clean_notes 汇总所做修复;详细过程见 cleaning.log,汇总见 quality_report.json

业务价值分析

  • 去重避免事件重复计数,保障DAU/转化率等核心指标准确性
  • 时间字段统一与纠错,保证漏斗/留存/时序分析口径一致
  • 动作分类修正,消除“注册/购买/点击”等口径混乱对漏斗转化的干扰
  • 时长异常仅标记不篡改,避免对用户行为强假设带来的偏差;同时通过填充策略让报表可用性提升
  • 货币与城市标准化,保证地域维度和营收口径一致,便于跨渠道对比与归因

优化与持续监控建议

  • 输入契约与校验
    • 强约束 schema:字段数=11;event_id 唯一;event_time 必须可解析;duration_ms>=0;spend>=0;spend_unit∈{CNY}
    • 建立动作字典:{click, register, login, purchase, logout},入库前校验并拒绝未知值
  • 数据字典与维表
    • 城市映射维表(含中文、英文、别名、子区合并策略),减少“ShenZhen/Shenzhen”类问题
  • 异常监控阈值
    • 建议监控:重复率<0.1%,非法时间<0.05%,非数字时长<0.2%,时长异常率<1%
    • 超阈触发告警并落地异常样本供排查
  • 缺失与异常策略
    • 缺失采用“组内稳健中位数”策略已实现;建议定期复核阈值与分组键(如按 action+device)
    • 对极端异常可另行输出黑名单,不直接删除保证可追溯
  • 作业化与审计
    • 调度周期化(如每小时/每日),保留日志与报告90天
    • 版本化清洗规则,变更需评审并在质量报告中体现规则版本号
  • 测试与回归
    • 用小样本单测关键函数:错位修复、时间修复、动作标准化、IQR标记、缺失推断
    • 每次规则更新先在影子作业跑1-2周,确认指标稳定再切主链路

说明

  • 脚本仅做安全去重(基于 event_id),且所有被移除记录保存到 duplicates.csv 保证可追溯
  • 对异常值采用“标记不改值”,符合“标记”策略,避免损害数据完整性
  • 对缺失值采用稳健中位数“缺失推断”,并在 duration_imputed 和 _clean_notes 中显式记录
  • 清洗全程不泄露或持久化敏感数据,所有输出仅用于质量评估与复核

如需对接你们的生产路径或调整规则阈值,请告知实际目录与口径,我会给出配置化版本。

Python自动化清洗脚本 下面的脚本可直接运行。若当前目录不存在 input.csv,会自动写入你提供的示例数据并完成全流程清洗、生成日志与质量报告。运行完成后,将在 ./output 目录下生成标准化后的 cleaned.csv、清洗日志 cleaning_log.txt 和质量报告 quality_report.json。

请将以下内容保存为 clean_inventory.py 并运行:python clean_inventory.py

#!/usr/bin/env python3

-- coding: utf-8 --

""" 库存/进销存CSV数据清洗脚本

  • 数据质量评估:缺失值、重复值、异常值、格式不一致、编码异常等统计
  • 缺失值处理:按规则推导/填充(如数量、单价、金额三者关系)
  • 数据标准化:日期/数值/文本/货币/单位等
  • 异常值处理:金额修正、文本乱码修复尝试、字段错位修正、重复合并
  • 可追溯:输出原始字段副本与修正标记,生成质量报告与处理日志

使用说明:

  • 默认输入文件:./input.csv(若不存在将写入示例数据)
  • 输出目录:./output
  • 清洗后文件:./output/cleaned.csv
  • 质量报告:./output/quality_report.json
  • 日志文件:./output/cleaning_log.txt

依赖:pandas, numpy 安装:pip install pandas numpy """

import os import re import json import math import logging import unicodedata from datetime import datetime from typing import List, Dict

尝试导入pandas与numpy

try: import pandas as pd import numpy as np except Exception as e: raise SystemExit("请先安装所需依赖: pip install pandas numpy\n错误: {}".format(e))

----------------------------

配置

----------------------------

INPUT_PATH = "./input.csv" OUTPUT_DIR = "./output" NORMALIZED_PATH = os.path.join(OUTPUT_DIR, "_normalized.csv") CLEANED_PATH = os.path.join(OUTPUT_DIR, "cleaned.csv") REPORT_PATH = os.path.join(OUTPUT_DIR, "quality_report.json") LOG_PATH = os.path.join(OUTPUT_DIR, "cleaning_log.txt")

EXPECTED_COLUMNS = [ "record_id","tx_date","sku","sku_name","warehouse","in_qty","out_qty", "unit","unit_cost","currency","total_cost","batch","remark" ]

BASE_CURRENCY = "CNY"

汇率表(示例,默认不启用自动换算,仅标记异币种;如需启用,请填入可信汇率并将 ENABLE_FX=True)

ENABLE_FX = False FX_RATE = { # "USD": 7.20, # 示例:1 USD = 7.20 CNY }

IQR异常值检测阈值

IQR_K = 1.5

金额修正容差

AMOUNT_TOL = 1e-6

----------------------------

准备输出与日志

----------------------------

os.makedirs(OUTPUT_DIR, exist_ok=True)

logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.FileHandler(LOG_PATH, encoding='utf-8'), logging.StreamHandler() ] ) log = logging.getLogger("cleaner")

----------------------------

若无输入则写入示例

----------------------------

SAMPLE_CSV = """record_id,tx_date,sku,sku_name,warehouse,in_qty,out_qty,unit,unit_cost,currency,total_cost,batch,remark r001,2024-03-01,S1001,矿泉水500ml,A仓,100,0,瓶,1.2,CNY,120,,入库 r002,2024/03/01,S1001,矿泉水500ML,A仓,,10,箱,30,CNY,300,20240301,数量缺失与单位不同 r003,2024-03-02,S1002,饼干-巧克力味,B仓,50,0,袋,¥2.5,CNY,125,,单价含符号 r004,2024-03-33,S1003,挂面,C仓,30,0,袋,1.8,CNY,5,,金额异常与日期错误 r005,2024-03-03,S1004,牛奶-原味,B仓,5000,0,瓶,3,CNY,15000,,数量异常 r006,2024-03-03,S1004,牛奶—原味,B仓,50,0,瓶,3,CNY,150,,破折号全角 r007,2024-03-04,S1005,酸奶,仓库,20,0,瓶,3,CNY,60,,正常 r007,2024-03-04,S1005,酸奶,仓库,20,0,瓶,3,CNY,60,,重复记录 r009,2024-03-05,S1006,薯片,C仓,10,0,包,2.5,USD,25,,币种不同 r010,2024-03-05,S1007,方便面,C仓,10,0,袋,2.,CNY,20,,单价格式异常 r011,2024-03-06,S1008,辣条,D仓,10,0,袋,2,CNY,20,批次A,备注含逗号,未引用 r012,2024-03-06,S1009,纱海ç›Â,D仓,10,0,袋,2,CNY,20,,文本乱码 r013,2024-03-06,S1010,矿泉誰,E仓,15,0,瓶,2.2,CNY,33,,拼写错误 """

if not os.path.exists(INPUT_PATH): with open(INPUT_PATH, "w", encoding="utf-8") as f: f.write(SAMPLE_CSV) log.info("未找到输入文件,已写入示例数据到 %s", INPUT_PATH)

----------------------------

工具函数

----------------------------

def safe_strip(x): return x.strip() if isinstance(x, str) else x

def nfkc(s: str) -> str: if not isinstance(s, str): return s s = s.strip() s = unicodedata.normalize("NFKC", s) # 标准化破折号/连字符 s = s.replace("—", "-").replace("–", "-").replace("-", "-") # 标准化货币符号空格 s = s.replace("¥", "¥") return s

def looks_mojibake(s: str) -> bool: if not isinstance(s, str): return False # 简单启发式:包含典型乱码片段 return ("Ã" in s) or ("Â" in s)

def fix_mojibake_try(s: str) -> str: """ 尝试修复常见UTF-8->latin1误解码的乱码。 保守处理:若结果不可读则返回原文。 """ if not isinstance(s, str): return s try: tentative = s.encode("latin1", errors="ignore").decode("utf-8", errors="ignore") # 若修复后字符数较为合理且不为空,则返回修复结果 if tentative and tentative != s: return tentative except Exception: pass return s

def parse_numeric(val): """ 数值解析:去除货币符号/千分位/多余文本,转为float """ if pd.isna(val): return np.nan if isinstance(val, (int, float)): return float(val) s = nfkc(str(val)) s = s.replace(",", "") # 去除常见货币/字母 s = re.sub(r"[A-Za-z¥$¥\s]", "", s) # 仅保留数字、小数点和负号 s = re.sub(r"[^0-9.-]", "", s) if s in ("", ".", "-"): return np.nan try: return float(s) except Exception: return np.nan

def parse_date_safe(s, fallback=None): """ 日期解析:支持YYYY-MM-DD或YYYY/MM/DD;非法返回NaT。 若fallback为8位数字批次(YYYYMMDD)且解析失败,则用fallback。 """ if pd.isna(s): s = "" s = nfkc(str(s)) s = s.replace("/", "-") dt = pd.to_datetime(s, errors="coerce", format="mixed") if pd.isna(dt) and fallback: fb = nfkc(str(fallback)) if re.fullmatch(r"\d{8}", fb): try: dt = pd.to_datetime(fb, format="%Y%m%d", errors="coerce") except Exception: dt = pd.NaT return dt

def normalize_csv(input_path, normalized_path, expected_cols: List[str]): """ 规范化CSV: - 修复未引用逗号导致的列错位:当列数>预期时,合并超出列到最后一列 - 当列数<预期时,用空值补齐 - 输出标准化CSV便于pandas读取 """ import csv with open(input_path, "r", encoding="utf-8") as f_in,
open(normalized_path, "w", encoding="utf-8", newline="") as f_out: reader = csv.reader(f_in) writer = csv.writer(f_out) header = next(reader, None) # 忽略输入头与预期不一致的情况,使用预期头 writer.writerow(expected_cols) for row in reader: if row is None: continue # 合并多余的列到最后一列remark if len(row) > len(expected_cols): row = row[:len(expected_cols)-1] + [",".join(row[len(expected_cols)-1:])] elif len(row) < len(expected_cols): row = row + [""] * (len(expected_cols) - len(row)) writer.writerow(row) log.info("已规范化CSV到: %s", normalized_path)

def iqr_outlier_flags(series: pd.Series): """ IQR异常值标记: 返回布尔Series """ s = series.dropna().astype(float) if len(s) < 4: return pd.Series([False]*len(series), index=series.index) q1, q3 = np.percentile(s, [25, 75]) iqr = q3 - q1 low = q1 - IQR_K * iqr high = q3 + IQR_K * iqr return (series < low) | (series > high)

----------------------------

1) 标准化读取

----------------------------

normalize_csv(INPUT_PATH, NORMALIZED_PATH, EXPECTED_COLUMNS) df = pd.read_csv(NORMALIZED_PATH, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "None"])

保存原始副本用于对比

df_raw = df.copy(deep=True)

----------------------------

类型与初步清理

----------------------------

统一去空格、NFKC规范化(含全角转半角、破折号等)

for c in EXPECTED_COLUMNS: df[c] = df[c].apply(nfkc)

数值列解析

for c in ["in_qty", "out_qty", "unit_cost", "total_cost"]: df[c + "_raw"] = df[c] df[c] = df[c].apply(parse_numeric)

日期解析(保留原始)

df["tx_date_raw"] = df["tx_date"] df["tx_date"] = df.apply(lambda r: parse_date_safe(r["tx_date_raw"], fallback=r.get("batch")), axis=1)

文本乱码修复尝试(仅对sku_name/remark/warehouse)

for c in ["sku_name", "remark", "warehouse"]: df[c + "_raw"] = df[c] need_fix = df[c].apply(looks_mojibake) df.loc[need_fix, c] = df.loc[need_fix, c].apply(fix_mojibake_try)

SKU名称规范(大小写/破折号/单位标记)

def normalize_name(name: str) -> str: if not isinstance(name, str): return name s = nfkc(name) # 统一ml大小写 s = re.sub(r"\bML\b", "ml", s, flags=re.IGNORECASE) # 统一连字符(已在nfkc替换) s = s return s

df["sku_name"] = df["sku_name"].apply(normalize_name)

单位与币种规范

df["unit_raw2"] = df["unit"] df["unit"] = df["unit"].apply(nfkc)

df["currency_raw"] = df["currency"] df["currency"] = df["currency"].str.upper().str.strip()

----------------------------

2) 数据质量评估(清洗前统计)

----------------------------

def count_invalid_dates(series): return int(series.isna().sum())

before_report = { "rows": int(len(df_raw)), "missing": { c: int((df_raw[c]=="" ).sum()) for c in EXPECTED_COLUMNS }, "invalid_dates": count_invalid_dates(df["tx_date"]), # 解析后NaT视为无效 "duplicates_by_record_id": int(df_raw.duplicated(subset=["record_id"]).sum()), "currency_heterogeneous": int((df["currency"] != BASE_CURRENCY).sum()), "amount_mismatch": 0, # 稍后计算 "unit_mixed_by_sku": 0, # 稍后计算 "mojibake_suspects": int(df_raw["sku_name"].apply(looks_mojibake).sum()), }

----------------------------

3) 缺失值处理与推导

----------------------------

qc_flags = []

df["qc_flags"] = "" df["notes"] = ""

def add_flag(idx, flag): df.at[idx, "qc_flags"] = "|".join([x for x in [df.at[idx, "qc_flags"], flag] if x])

def add_note(idx, note): df.at[idx, "notes"] = " ; ".join([x for x in [df.at[idx, "notes"], note] if x])

推导数量/单价:

for idx, r in df.iterrows(): in_qty, out_qty, unit_cost, total_cost = r["in_qty"], r["out_qty"], r["unit_cost"], r["total_cost"]

# 填充缺失的in_qty
if (pd.isna(in_qty) or in_qty==0) and not pd.isna(unit_cost) and not pd.isna(total_cost) and unit_cost not in (0, 0.0):
    inferred = total_cost / unit_cost
    df.at[idx, "in_qty"] = inferred
    add_flag(idx, "in_qty_inferred")
    add_note(idx, f"in_qty由total_cost/unit_cost推导={inferred:.6f}")

# 填充缺失的unit_cost
in_qty = df.at[idx, "in_qty"]
out_qty = df.at[idx, "out_qty"]
if pd.isna(unit_cost) and not pd.isna(total_cost):
    qty_ref = None
    if not pd.isna(in_qty) and in_qty != 0:
        qty_ref = in_qty
    elif not pd.isna(out_qty) and out_qty != 0:
        qty_ref = out_qty
    if qty_ref:
        uc = total_cost / qty_ref
        df.at[idx, "unit_cost"] = uc
        add_flag(idx, "unit_cost_inferred")
        add_note(idx, f"unit_cost由total_cost/qty推导={uc:.6f}")

缺失的出库数量默认保持NaN或0不动(不妄填),若为空字符串已转NaN

----------------------------

4) 异常值与金额修正

----------------------------

金额检查:若存在in_qty>0则用in_qty计算,否则用out_qty

mismatch_count = 0 corrected_count = 0 for idx, r in df.iterrows(): in_qty, out_qty, unit_cost, total_cost = r["in_qty"], r["out_qty"], r["unit_cost"], r["total_cost"] if pd.isna(unit_cost): continue qty_ref = None if not pd.isna(in_qty) and in_qty > 0: qty_ref = in_qty elif not pd.isna(out_qty) and out_qty > 0: qty_ref = out_qty if qty_ref is None: continue expected_total = unit_cost * qty_ref if pd.isna(total_cost) or abs(expected_total - total_cost) > AMOUNT_TOL: mismatch_count += 1 # 修正金额 df.at[idx, "total_cost"] = expected_total add_flag(idx, "total_cost_corrected") add_note(idx, f"金额由{total_cost}修正为{expected_total:.6f}") corrected_count += 1

before_report["amount_mismatch"] = mismatch_count

数量异常(IQR)按SKU+单位检测,仅标记不篡改

df["qty_for_outlier"] = df[["in_qty", "out_qty"]].fillna(0).max(axis=1) df["is_outlier_qty"] = False for (sku, unit), g in df.groupby(["sku", "unit"]): flags = iqr_outlier_flags(g["qty_for_outlier"]) df.loc[g.index, "is_outlier_qty"] = flags df.loc[g.index[flags], "qc_flags"] = df.loc[g.index[flags], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "qty_outlier"] if x]))

日期无效标记

df["date_invalid"] = df["tx_date"].isna() df.loc[df["date_invalid"], "qc_flags"] = df.loc[df["date_invalid"], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "date_invalid"] if x]))

异币种标记与可选转换

df["currency_mismatch"] = df["currency"] != BASE_CURRENCY if ENABLE_FX: for idx, r in df.iterrows(): cur = r["currency"] if cur != BASE_CURRENCY and isinstance(cur, str) and cur in FX_RATE: rate = FX_RATE[cur] # 仅计算一个本币金额参考,不覆盖原始金额 if not pd.isna(r["total_cost"]): add_note(idx, f"按汇率{cur}->{BASE_CURRENCY}={rate} 折算本币金额供参考") df.at[idx, "total_cost_in_base"] = r["total_cost"] * rate elif cur != BASE_CURRENCY: add_note(idx, f"异币种未转换,缺少汇率: {cur}")

单位混用(按SKU)

mixed_unit_skus = [] unit_map = df.groupby("sku")["unit"].nunique() for sku, nunique in unit_map.items(): if nunique > 1: mixed_unit_skus.append(sku) before_report["unit_mixed_by_sku"] = len(mixed_unit_skus)

拼写疑似错误(启发式:包含非常用汉字或疑似错别字“誰”等),仅标记

df["spelling_suspect"] = df["sku_name"].apply(lambda s: isinstance(s, str) and ("誰" in s or "?" in s)) df.loc[df["spelling_suspect"], "qc_flags"] = df.loc[df["spelling_suspect"], "qc_flags"].apply(lambda s: "|".join([x for x in [s, "spelling_suspect"] if x]))

----------------------------

5) 重复记录合并(按record_id)

----------------------------

dup_mask = df.duplicated(subset=["record_id"], keep=False) duplicate_ids = df.loc[dup_mask, "record_id"].unique().tolist() dup_count = int(df.duplicated(subset=["record_id"]).sum())

def merge_duplicates(group: pd.DataFrame) -> pd.Series: # 记录合并策略:保留第一条,备注合并去重拼接;如关键字段冲突,记录conflict标记 g = group.sort_values(by=["tx_date_raw"], kind="stable") first = g.iloc[0].copy() conflicts = [] # 合并备注 remarks = [x for x in g["remark"].tolist() if isinstance(x, str) and x != ""] if remarks: merged_remark = " | ".join(pd.unique(remarks)) first["remark"] = merged_remark first["qc_flags"] = "|".join([x for x in [first["qc_flags"], "merged_duplicate"] if x]) first["notes"] = " ; ".join([x for x in [first["notes"], f"合并{len(g)-1}条重复备注"] if x]) # 关键字段一致性检查 key_cols = ["tx_date","sku","sku_name","warehouse","in_qty","out_qty","unit","unit_cost","currency","total_cost","batch"] for c in key_cols: vals = pd.unique(g[c].astype(str)) if len(vals) > 1: conflicts.append(c) if conflicts: first["qc_flags"] = "|".join([x for x in [first["qc_flags"], "duplicate_conflict"] if x]) first["notes"] = " ; ".join([x for x in [first["notes"], f"重复记录字段不一致: {','.join(conflicts)}"] if x]) return first

if dup_count > 0: df = df.groupby("record_id", as_index=False, group_keys=False).apply(merge_duplicates) df = df.reset_index(drop=True)

----------------------------

6) 清洗后统计与输出

----------------------------

after_report = { "rows": int(len(df)), "missing": { # 对字符串型列,统计空字符串;对数值与日期,统计NaN或NaT "in_qty": int(df["in_qty"].isna().sum()), "out_qty": int(df["out_qty"].isna().sum()), "unit_cost": int(df["unit_cost"].isna().sum()), "total_cost": int(df["total_cost"].isna().sum()), "tx_date_invalid": int(df["tx_date"].isna().sum()), "batch_missing": int((df["batch"]=="").sum()), }, "duplicates_by_record_id": int(df.duplicated(subset=["record_id"]).sum()), "amount_corrected": int((df["qc_flags"].str.contains("total_cost_corrected", na=False)).sum()), "in_qty_inferred": int((df["qc_flags"].str.contains("in_qty_inferred", na=False)).sum()), "unit_mixed_skus": mixed_unit_skus, "qty_outliers": int(df["is_outlier_qty"].sum()), "currency_mismatch": int((df["currency_mismatch"]).sum()), "mojibake_fixed": int((df["sku_name_raw"] != df["sku_name"]).sum()), "spelling_suspect": int(df["spelling_suspect"].sum()), }

quality_report = { "before": before_report, "after": after_report, "notes": { "base_currency": BASE_CURRENCY, "fx_enabled": ENABLE_FX, "fx_rate_available": list(FX_RATE.keys()) } }

输出清洗后的CSV(保留原始副本列,替换标准化后的主字段,保证可追溯)

df_export = df.copy()

将日期标准化为YYYY-MM-DD字符串输出

df_export["tx_date"] = df_export["tx_date"].dt.strftime("%Y-%m-%d")

优先输出主业务列+追溯列+质检列

preferred_cols = [ "record_id","tx_date","sku","sku_name","warehouse","in_qty","out_qty","unit","unit_cost","currency","total_cost","batch","remark", "tx_date_raw","sku_name_raw","unit_cost_raw","total_cost_raw","currency_raw","unit_raw2","remark_raw", "qc_flags","notes" ] for c in preferred_cols: if c not in df_export.columns: df_export[c] = None

df_export = df_export[preferred_cols]

保存

df_export.to_csv(CLEANED_PATH, index=False, encoding="utf-8-sig") with open(REPORT_PATH, "w", encoding="utf-8") as f: json.dump(quality_report, f, ensure_ascii=False, indent=2)

log.info("清洗完成:%s", CLEANED_PATH) log.info("质量报告:%s", REPORT_PATH) log.info("日志:%s", LOG_PATH)

控制台输出简要评估

print("\n=== 清洗效果摘要 ===") print(json.dumps(quality_report, ensure_ascii=False, indent=2)) print(f"\n输出文件: {CLEANED_PATH}") print(f"日志: {LOG_PATH}") print(f"质量报告: {REPORT_PATH}")

清洗后的CSV数据路径 ./output/cleaned.csv

清洗效果评估 以下为基于你提供的数据样例(运行脚本即可得到同等或更详尽的报告):

关键指标对比(预期/示例)

  • 记录数:原始 13 条 -> 清洗后 12 条(合并1个重复record_id r007)
  • 缺失值
    • in_qty:原始 1 条(r002)-> 清洗后 0 条(由 total_cost/unit_cost 推导为 10)
    • unit_cost/total_cost:原始存在符号或格式异常 -> 清洗后均已数值化
  • 日期
    • 格式不一致:已统一为 YYYY-MM-DD
    • 日期错误:1 条(r004: 2024-03-33)无法可靠修正,已标记 date_invalid
  • 金额异常
    • 原始 1 条(r004: total_cost=5 与30*1.8不符)
    • 清洗后 0 条(已按unit_cost*数量修正为54并打标 total_cost_corrected)
  • 重复记录:原始 1 组(r007)-> 清洗后 0;备注合并为“正常 | 重复记录”
  • 文本/编码
    • 单价含货币符号:已清理(如“¥2.5”->2.5)
    • 文本乱码:1 条(r012)尝试修复(保留原值副本 sku_name_raw)
    • 破折号全角:已统一(“牛奶—原味”->“牛奶-原味”)
    • SKU名称“ML/ml”大小写:已统一为“ml”
  • 单价格式异常:如“2.”->2.0,已修正
  • 币种不同:1 条(USD)已标记 currency_mismatch;默认不执行汇率换算(可参数化启用)
  • 单位不统一(按SKU):S1001 同时出现“瓶/箱”,已识别为混用并标记在报告中;未做换算(缺少权威换算关系)
  • 数量异常(IQR):如 r005 的 5000 标记为 qty_outlier,但不改值(金额匹配,视为大额业务发生)

业务价值分析

  • 提升金额准确性:自动修正金额异常,避免后续成本/利润分析偏差
  • 提升一致性与可用性:日期、数值、文本统一标准,便于汇总与对账
  • 降低人工处理成本:自动发现并合并重复、修复字段错位(备注含逗号未引用)
  • 可追溯治理:关键字段保留原始副本与修正标记,满足审计和数据治理要求
  • 风险可见:对币种、单位混用、疑似错拼、极端数量进行显式标记,辅助业务复核

优化建议

  • 源头治理
    • 强制CSV输出使用标准引用规则:包含逗号的文本字段需使用双引号包裹
    • 定义主键唯一性:record_id 严格唯一,重复应在源系统拦截
    • 统一日期格式为YYYY-MM-DD;禁止非法日期写入
    • 维护“SKU-基础单位”权威字典与单位换算表(如 S1001: 箱->瓶 的转换率)
    • 维护币种与可信汇率表,支持按账期(tx_date)进行本币折算
  • 清洗规则深化
    • 对日期无法解析但批次为YYYYMMDD者,允许回填日期(脚本已支持fallback)
    • 引入更细粒度的异常检测(按SKU季节性阈值/滑动窗口)但需有业务确认流程
    • 对疑似错别字建立人工校对清单与自动纠正白名单
  • 监控与审计
    • 定期产出质量KPI(缺失率、重复率、异常率、单位混用SKU数)
    • 将清洗日志与质量报告纳入数据质量监控看板并设置告警阈值
    • 建立“修正回滚”机制:保留原始值与修正值,支持按record_id回退

说明与合规

  • 未删除或篡改关键业务字段:所有被修正的字段均保留 raw 副本和 qc_flags/notes 标记
  • 敏感数据:脚本不外传数据、不调用外部服务
  • 单位换算与汇率转换:因缺乏权威映射与可信汇率,默认不自动换算,仅标记并提供可配置入口
  • 异常修正:仅在有确定规则时修正(如金额=单价*数量),无法确定的(如非法日期)仅标记不猜测

如需我将“单位换算表/汇率”接入并自动转换,请提供映射表(例如:S1001 箱->瓶=12;USD->CNY=7.20,生效区间)或允许我生成参数化模板供你维护。

示例详情

解决的问题

为商业团队提供一套可直接上手的“数据清洗专家”提示词,让任何人都能快速产出可执行、可审计的清洗方案。通过专家化角色与闭环流程,自动完成数据质量体检、清洗规则设计、处理步骤规划、结果验证与优化建议,帮助你把杂乱数据迅速转化为可信的分析资产。核心价值:提升数据可用性与可信度、缩短项目周期、降低人力与沟通成本,支持销售、用户行为、库存、财务等常见业务场景与多种数据源格式。试用即得:只需输入数据源类型、主要问题、业务用途与质量要求,即刻生成清洗报告与可执行流程,助你从“先清洗后分析”迈向“清洗即分析”的高效协作。

适用用户

数据分析师

用本提示快速完成数据质量评估与预处理,生成可视化清洗报告,提升模型训练与报表结论的可信度。

增长运营经理

整合多渠道行为数据,一键统一时间与用户标识,去重与异常剔除,迅速产出可对比的转化与留存指标。

电商商品运营

规范SKU、品牌与单位,修正价格与库存异常,清理重复商品档案,保障销量分析与补货决策准确。

特征总结

一键评估数据质量,自动识别缺失、重复、格式冲突等问题并给出优先级
面向业务目标的清洗规则库,按销售、行为、库存、财务场景快速套用
分步执行流程可视化,关键节点质检与回溯,保障结果可验证与可追溯
多数据源轻松接入,自动统一字段格式、编码体系与计量单位,减少对齐耗时
智能异常值检测与修正,兼顾业务影响与数据完整性与可追溯标记,全流程留痕
自动生成清洗报告与指标对比,量化质量提升与分析可信度,便于汇报复盘
可定制化脚本与参数模板,一键复用不同项目的清洗方案,降低重复劳动
持续监控与优化建议,帮助建立合规的数据治理闭环与预警,防止质量反弹
错误处理与日志记录完善,异常情况自动提示并提供修复路径,保障业务安全

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 765 tokens
- 5 个可调节参数
{ 数据质量问题 } { 质量要求 } { 输入CSV数据 } { 清洗方法偏好 } { 异常值处理策略 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59