创建数据清洗脚本

0 浏览
0 试用
0 购买
Sep 27, 2025更新

生成用于清理指定数据集缺失值的Python脚本。

示例1

以下提供一个可复用的Python脚本,用于电商销售数据的缺失值清理与多方案填补。脚本支持一键生成多种填补方案:均值、中位数、众数、分组、回归。针对本数据集的特点,脚本处理了qty/price的“0即缺失”混用、age缺失较多和payment_method少量空白的问题,且提供分组与回归填补的工程化模板,方便在其他数据集复用。

脚本说明
- 输入列假设为:[id, order_date, sku, qty, price, discount, customer_id, city, age, payment_method]
- 缺失标准化:空字符串、全空白字符串、常见缺失标记统一为NaN;可选将qty/price中的0视为缺失。
- 多方案填补:
  - 均值:数值列用均值填补;类别列用众数。
  - 中位数:数值列用中位数填补;类别列用众数。
  - 众数:数值列、类别列均用众数。
  - 分组:qty/price按sku的组内中位数;age按city的组内中位数;payment_method按city的组内众数;并设全局回落值。
  - 回归:对age/price/qty构建独立的回归模型(RandomForestRegressor),使用订单时间特征、sku/city/payment_method等特征进行预测;payment_method仍用众数填补。
- 输出:每种方案生成一个独立CSV。
- 可配置项:零视为缺失的列、策略选择、输出目录等。

代码
请将以下内容保存为impute_sales.py,并安装依赖:pip install pandas numpy scikit-learn

```python
# impute_sales.py
# -*- coding: utf-8 -*-
"""
电商销售表缺失值清理与多方案填补脚本
支持方案:均值/中位数/众数/分组/回归
"""

import argparse
import os
from typing import Dict, List

import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder


DEFAULT_NA_VALUES = ["", " ", "  ", "NA", "N/A", "NULL", "null", "None", "none"]


def parse_args():
    parser = argparse.ArgumentParser(description="电商销售表缺失值清理与多方案填补")
    parser.add_argument("--input", type=str, required=True, help="输入CSV文件路径")
    parser.add_argument("--output-dir", type=str, required=True, help="输出目录")
    parser.add_argument(
        "--strategies",
        type=str,
        default="all",
        help="填补策略,逗号分隔:mean,median,mode,group,regression 或 all",
    )
    parser.add_argument(
        "--zero-as-missing",
        type=str,
        default="qty,price",
        help="将0视为缺失的列,逗号分隔;为空表示不启用",
    )
    parser.add_argument(
        "--random-state",
        type=int,
        default=42,
        help="随机种子(回归模型)",
    )
    return parser.parse_args()


def load_data(path: str, na_values: List[str]) -> pd.DataFrame:
    df = pd.read_csv(path, na_values=na_values, dtype=str)
    # 去除首尾空白以统一缺失识别
    for col in df.columns:
        df[col] = df[col].astype(str).str.strip()
        df.loc[df[col].isin(["", " "]), col] = np.nan
    return df


def convert_types(df: pd.DataFrame) -> pd.DataFrame:
    # 类型转换,尽量严格但避免抛异常
    out = df.copy()

    # 日期
    out["order_date"] = pd.to_datetime(out["order_date"], errors="coerce")

    # 数值列
    for col in ["qty", "price", "discount", "age"]:
        out[col] = pd.to_numeric(out[col], errors="coerce")

    # 标识列保留字符串
    # 类别列保持字符串(可能含NaN)
    return out


def standardize_missing(df: pd.DataFrame, zero_as_missing_cols: List[str]) -> pd.DataFrame:
    out = df.copy()
    # 0视为缺失(可选)
    for col in zero_as_missing_cols:
        if col in out.columns and pd.api.types.is_numeric_dtype(out[col]):
            out.loc[out[col] == 0, col] = np.nan
    return out


def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out["order_year"] = out["order_date"].dt.year
    out["order_month"] = out["order_date"].dt.month
    out["order_weekday"] = out["order_date"].dt.weekday
    return out


def fill_categorical_mode(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    out = df.copy()
    for col in cols:
        if col in out.columns:
            mode_vals = out[col].mode(dropna=True)
            if len(mode_vals) > 0:
                out[col] = out[col].fillna(mode_vals.iloc[0])
    return out


def impute_mean(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # 类别列:众数
    out = fill_categorical_mode(out, ["sku", "city", "payment_method"])
    # 数值列:均值
    for col in ["qty", "price", "discount", "age"]:
        if col in out.columns:
            mean_val = out[col].mean(skipna=True)
            out[col] = out[col].fillna(mean_val)
    return out


def impute_median(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    out = fill_categorical_mode(out, ["sku", "city", "payment_method"])
    for col in ["qty", "price", "discount", "age"]:
        if col in out.columns:
            med_val = out[col].median(skipna=True)
            out[col] = out[col].fillna(med_val)
    return out


def impute_mode(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()
    # 所有列用众数(数值列众数可能不唯一,选第一个)
    for col in out.columns:
        mode_vals = out[col].mode(dropna=True)
        if len(mode_vals) > 0:
            out[col] = out[col].fillna(mode_vals.iloc[0])
    return out


def impute_group(df: pd.DataFrame) -> pd.DataFrame:
    out = df.copy()

    # payment_method按city众数
    if "payment_method" in out.columns and "city" in out.columns:
        out["payment_method"] = (
            out.groupby("city")["payment_method"]
            .transform(lambda s: s.fillna(s.mode().iloc[0] if len(s.mode()) > 0 else np.nan))
        )
        # 全局回落
        pm_mode = out["payment_method"].mode(dropna=True)
        if len(pm_mode) > 0:
            out["payment_method"] = out["payment_method"].fillna(pm_mode.iloc[0])

    # qty/price按sku中位数;全局回落用中位数
    for col in ["qty", "price"]:
        if col in out.columns and "sku" in out.columns:
            out[col] = out.groupby("sku")[col].transform(lambda s: s.fillna(s.median()))
            global_med = out[col].median(skipna=True)
            out[col] = out[col].fillna(global_med)

    # age按city中位数;全局回落用中位数
    if "age" in out.columns and "city" in out.columns:
        out["age"] = out.groupby("city")["age"].transform(lambda s: s.fillna(s.median()))
        age_med = out["age"].median(skipna=True)
        out["age"] = out["age"].fillna(age_med)

    # discount若有缺失,统一用中位数(未指定分组)
    if "discount" in out.columns:
        out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True))

    return out


def build_regression_imputer(
    df: pd.DataFrame,
    target: str,
    random_state: int = 42,
) -> Pipeline:
    """
    为目标列构建回归填补管道。
    数值特征:median填补;类别特征:众数填补+OneHot。
    模型:随机森林回归(对非线性和类别特征较稳健)。
    """
    # 特征列定义:排除标识列和目标列
    # 使用订单时间特征和其他可解释变量
    time_features = ["order_year", "order_month", "order_weekday"]
    cat_features = [c for c in ["sku", "city", "payment_method"] if c in df.columns]
    num_features_all = [c for c in ["qty", "price", "discount", "age"] if c in df.columns]
    num_features = [c for c in num_features_all if c != target]

    # 组装列转换
    transformers = []
    if len(num_features) > 0:
        transformers.append(("num", SimpleImputer(strategy="median"), num_features))
    if len(cat_features) > 0:
        transformers.append(
            (
                "cat",
                Pipeline(
                    steps=[
                        ("imputer", SimpleImputer(strategy="most_frequent")),
                        ("ohe", OneHotEncoder(handle_unknown="ignore")),
                    ]
                ),
                cat_features,
            )
        )
    if len(time_features) > 0:
        transformers.append(("time", SimpleImputer(strategy="median"), time_features))

    preprocessor = ColumnTransformer(transformers=transformers, remainder="drop")

    model = RandomForestRegressor(
        n_estimators=200,
        max_depth=None,
        random_state=random_state,
        n_jobs=-1,
    )

    pipe = Pipeline(steps=[("preprocess", preprocessor), ("model", model)])
    return pipe


def impute_regression(df: pd.DataFrame, random_state: int = 42) -> pd.DataFrame:
    out = df.copy()

    # 类别填补优先处理:payment_method用众数,避免建模特征中缺失
    out = fill_categorical_mode(out, ["payment_method", "sku", "city"])

    # 时间特征
    out = add_time_features(out)

    # 针对discount的简单填补,避免作为特征时大量缺失
    if "discount" in out.columns:
        out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True))

    targets = [c for c in ["age", "price", "qty"] if c in out.columns]

    for target in targets:
        # 构建并训练回归模型
        pipe = build_regression_imputer(out, target=target, random_state=random_state)

        mask_train = out[target].notna()
        mask_pred = out[target].isna()

        if mask_train.sum() == 0 or mask_pred.sum() == 0:
            # 无需训练或无待填补
            continue

        X_train = out.loc[mask_train]
        y_train = out.loc[mask_train, target]
        X_pred = out.loc[mask_pred]

        pipe.fit(X_train, y_train)
        y_hat = pipe.predict(X_pred)

        # 合理边界控制:非负;age限定在0-100之间(如需调整可修改)
        if target in ["qty", "price"]:
            y_hat = np.maximum(y_hat, 0.0)
        if target == "age":
            y_hat = np.clip(y_hat, 0.0, 100.0)

        out.loc[mask_pred, target] = y_hat

    return out


def generate_outputs(df: pd.DataFrame, strategies: List[str], out_dir: str, random_state: int):
    os.makedirs(out_dir, exist_ok=True)
    results: Dict[str, pd.DataFrame] = {}

    if "mean" in strategies:
        results["mean"] = impute_mean(df)
    if "median" in strategies:
        results["median"] = impute_median(df)
    if "mode" in strategies:
        results["mode"] = impute_mode(df)
    if "group" in strategies:
        results["group"] = impute_group(df)
    if "regression" in strategies:
        results["regression"] = impute_regression(df, random_state=random_state)

    # 输出保存
    for name, dfi in results.items():
        out_path = os.path.join(out_dir, f"{name}_imputed.csv")
        dfi.to_csv(out_path, index=False)
        # 缺失情况报告
        miss_report = dfi.isna().sum()
        print(f"[{name}] 保存至: {out_path}")
        print(f"[{name}] 每列剩余缺失数:")
        print(miss_report.to_string())
        print("-" * 60)
    return results


def main():
    args = parse_args()
    strategies = (
        ["mean", "median", "mode", "group", "regression"]
        if args.strategies.strip().lower() == "all"
        else [s.strip().lower() for s in args.strategies.split(",")]
    )

    zero_cols = [c.strip() for c in args.zero_as_missing.split(",") if c.strip()]

    # 读取与标准化
    df = load_data(args.input, na_values=DEFAULT_NA_VALUES)
    df = convert_types(df)
    df = standardize_missing(df, zero_as_missing_cols=zero_cols)

    # 一键生成多方案填补
    generate_outputs(df, strategies=strategies, out_dir=args.output_dir, random_state=args.random_state)


if __name__ == "__main__":
    main()
```

使用示例
- 一键生成全部方案(默认将qty和price的0视为缺失):
  - python impute_sales.py --input sales.csv --output-dir clean/ --strategies all
- 仅生成分组与回归方案:
  - python impute_sales.py --input sales.csv --output-dir clean/ --strategies group,regression
- 不将0视为缺失:
  - python impute_sales.py --input sales.csv --output-dir clean/ --zero-as-missing ""

实现要点与专业建议
- qty/price的“0与缺失混用”通过可配置的zero_as_missing处理,避免将免费商品或业务有效0误判为缺失。请依据业务规则决定是否启用。
- 分组填补使用组内统计量并设置全局回落值,保证小组样本不足时的鲁棒性。当前默认:qty/price按sku中位数,age/payment_method按city;可依据实际业务结构调整分组键。
- 回归填补为数值列构建独立模型,包含时间特征与类别OneHot编码,支持在缺失比例较高时提升填补质量。模型选择随机森林回归,兼顾非线性与类别处理;如需更高精度,可替换为梯度提升、XGBoost等。
- 类别列(payment_method)在本场景缺失较少,采用众数填补更稳定且成本低。若未来需要预测类别,可扩展分类模型填补。
- 建议在生成的多方案结果中进行交叉验证与离线评估(例如对有标签的样本进行保留集评估),选择最符合业务指标的方案用于生产。

该脚本作为模板可在其他表结构上复用。对于不同数据集,仅需:
- 调整列类型识别与特征工程;
- 配置分组键与模型特征;
- 根据业务规则设置zero_as_missing与边界裁剪。

示例2

Below is a self-contained Python script that performs a pre-ingestion missingness audit, imputes missing values according to the specified rules, generates a before/after comparison for changed fields, and outputs an anomalies list. It is designed for the daily transaction wide table with columns [txn_id, ts, branch, channel, amount, fee]. Imputation rules:
- amount and fee: group-wise imputation using branch + channel first, then branch-only, then channel-only, then global; default statistic is median (configurable).
- ts: forward-fill within each (branch, channel) group after sorting by ts ascending (no backfill).

This script also enforces consistent definitions and tracking:
- All imputations are sourced from non-missing values only.
- Imputation sources are recorded per field.
- Pre-ingestion missingness audit is persisted.
- Residual missing and duplicate txn_id records are flagged as anomalies.

You can run:
python clean_transactions.py --input /path/to/input.csv --outdir ./out --impute-stat median

Code:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import os
from typing import Tuple, Dict
import pandas as pd
import numpy as np


def compute_missingness(df: pd.DataFrame, cols: list) -> pd.DataFrame:
    total = len(df)
    audit = []
    for c in cols:
        nulls = df[c].isna().sum()
        audit.append({"column": c, "null_count": int(nulls), "total": int(total), "null_rate": float(nulls / total if total else np.nan)})
    return pd.DataFrame(audit)


def ensure_dtypes(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, pd.Series]]:
    original = {}
    for col in ["txn_id", "ts", "branch", "channel", "amount", "fee"]:
        if col not in df.columns:
            raise ValueError(f"Missing required column: {col}")

    # Preserve originals for anomaly detection
    for c in ["ts", "amount", "fee"]:
        original[f"orig_{c}"] = df[c].copy()

    # Coerce types
    df["txn_id"] = df["txn_id"].astype(str)
    df["branch"] = df["branch"].astype(str)
    df["channel"] = df["channel"].astype(str)

    df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=False)
    df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
    df["fee"] = pd.to_numeric(df["fee"], errors="coerce")

    return df, original


def build_group_stats(df: pd.DataFrame, stat: str = "median") -> Dict[str, pd.DataFrame]:
    agg_func = np.median if stat == "median" else np.mean
    use_cols = ["amount", "fee"]

    g_bc = df.groupby(["branch", "channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={
        "amount": "g_bc_amount", "fee": "g_bc_fee"
    })
    g_b = df.groupby(["branch"], dropna=False)[use_cols].agg(agg_func).rename(columns={
        "amount": "g_b_amount", "fee": "g_b_fee"
    })
    g_c = df.groupby(["channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={
        "amount": "g_c_amount", "fee": "g_c_fee"
    })
    g_all = df[use_cols].agg(agg_func).to_dict()  # {"amount": x, "fee": y}

    return {"g_bc": g_bc, "g_b": g_b, "g_c": g_c, "g_all": g_all}


def impute_amount_fee(df: pd.DataFrame, stat: str = "median") -> pd.DataFrame:
    stats = build_group_stats(df, stat=stat)

    df = df.merge(stats["g_bc"], how="left", left_on=["branch", "channel"], right_index=True)
    df = df.merge(stats["g_b"], how="left", on="branch")
    df = df.merge(stats["g_c"], how="left", on="channel")

    # Prepare targets
    df["amount_impute_source"] = None
    df["fee_impute_source"] = None

    # Amount
    amount_filled = df["amount"].copy()
    src = pd.Series(index=df.index, dtype=object)

    mask = amount_filled.isna() & df["g_bc_amount"].notna()
    amount_filled[mask] = df.loc[mask, "g_bc_amount"]
    src[mask] = "branch+channel"

    mask = amount_filled.isna() & df["g_b_amount"].notna()
    amount_filled[mask] = df.loc[mask, "g_b_amount"]
    src[mask] = "branch"

    mask = amount_filled.isna() & df["g_c_amount"].notna()
    amount_filled[mask] = df.loc[mask, "g_c_amount"]
    src[mask] = "channel"

    if np.isfinite(stats["g_all"]["amount"]) and not pd.isna(stats["g_all"]["amount"]):
        mask = amount_filled.isna()
        amount_filled[mask] = stats["g_all"]["amount"]
        src[mask] = "global"

    df["amount_impute_source"] = src.where(df["amount"].isna(), None)
    df["amount_filled"] = amount_filled
    df["amount_was_imputed"] = df["amount"].isna() & df["amount_filled"].notna()

    # Fee
    fee_filled = df["fee"].copy()
    src = pd.Series(index=df.index, dtype=object)

    mask = fee_filled.isna() & df["g_bc_fee"].notna()
    fee_filled[mask] = df.loc[mask, "g_bc_fee"]
    src[mask] = "branch+channel"

    mask = fee_filled.isna() & df["g_b_fee"].notna()
    fee_filled[mask] = df.loc[mask, "g_b_fee"]
    src[mask] = "branch"

    mask = fee_filled.isna() & df["g_c_fee"].notna()
    fee_filled[mask] = df.loc[mask, "g_c_fee"]
    src[mask] = "channel"

    if np.isfinite(stats["g_all"]["fee"]) and not pd.isna(stats["g_all"]["fee"]):
        mask = fee_filled.isna()
        fee_filled[mask] = stats["g_all"]["fee"]
        src[mask] = "global"

    df["fee_impute_source"] = src.where(df["fee"].isna(), None)
    df["fee_filled"] = fee_filled
    df["fee_was_imputed"] = df["fee"].isna() & df["fee_filled"].notna()

    return df


def impute_ts_ffill(df: pd.DataFrame) -> pd.DataFrame:
    # Forward fill within (branch, channel), sorted by ts ascending, then txn_id for deterministic order
    # Note: initial missing ts in a group remain missing (no backfill).
    order_cols = ["branch", "channel", "ts", "txn_id"]
    df_sorted = df.sort_values(order_cols, na_position="first").copy()
    ts_ffilled = df_sorted.groupby(["branch", "channel"], dropna=False)["ts"].ffill()
    df_sorted["ts_filled"] = ts_ffilled

    # Restore original index alignment
    df["ts_filled"] = df_sorted.loc[df_sorted.index, "ts_filled"].sort_index()
    df["ts_was_imputed"] = df["ts"].isna() & df["ts_filled"].notna()
    return df


def build_before_after(df: pd.DataFrame) -> pd.DataFrame:
    changes = []
    for col, filled_col, src_col in [
        ("amount", "amount_filled", "amount_impute_source"),
        ("fee", "fee_filled", "fee_impute_source"),
        ("ts", "ts_filled", None),
    ]:
        diff_mask = ~df[col].eq(df[filled_col])  # handles NaN vs value etc.
        if diff_mask.any():
            tmp = pd.DataFrame({
                "txn_id": df.loc[diff_mask, "txn_id"].astype(str),
                "field": col,
                "before": df.loc[diff_mask, col].astype(str),
                "after": df.loc[diff_mask, filled_col].astype(str),
            })
            if src_col:
                tmp["impute_source"] = df.loc[diff_mask, src_col].astype(str)
            else:
                tmp["impute_source"] = "ffill"
            changes.append(tmp)
    if changes:
        return pd.concat(changes, ignore_index=True)
    else:
        return pd.DataFrame(columns=["txn_id", "field", "before", "after", "impute_source"])


def build_anomalies(df: pd.DataFrame, original: Dict[str, pd.Series]) -> pd.DataFrame:
    anomalies = []

    # Duplicated txn_id
    dups = df[df["txn_id"].duplicated(keep=False)].copy()
    if not dups.empty:
        dups["anomaly_type"] = "duplicate_txn_id"
        anomalies.append(dups[["txn_id", "anomaly_type"]])

    # Rows still missing after imputation in critical fields
    still_missing = df[df["ts_filled"].isna() | df["amount_filled"].isna() | df["fee_filled"].isna()].copy()
    if not still_missing.empty:
        still_missing["anomaly_type"] = "residual_missing_after_imputation"
        anomalies.append(still_missing[["txn_id", "anomaly_type"]])

    # Coercion-induced NaNs for ts/amount/fee
    for col, orig_col in [("ts", "orig_ts"), ("amount", "orig_amount"), ("fee", "orig_fee")]:
        coerced_nan_mask = original[orig_col].notna() & df[col].isna()
        if coerced_nan_mask.any():
            rows = df.loc[coerced_nan_mask, ["txn_id"]].copy()
            rows["anomaly_type"] = f"type_coercion_nan_{col}"
            anomalies.append(rows)

    if anomalies:
        out = pd.concat(anomalies, ignore_index=True).drop_duplicates()
        return out
    else:
        return pd.DataFrame(columns=["txn_id", "anomaly_type"])


def imputation_summary(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for field, src_col, flag_col in [
        ("amount", "amount_impute_source", "amount_was_imputed"),
        ("fee", "fee_impute_source", "fee_was_imputed"),
    ]:
        # Distribution of sources
        src_counts = df.loc[df[flag_col], src_col].value_counts(dropna=False)
        for src, cnt in src_counts.items():
            rows.append({"field": field, "impute_source": src if src is not None else "none", "count": int(cnt)})
        # Total imputed
        rows.append({"field": field, "impute_source": "TOTAL_IMPUTED", "count": int(df[flag_col].sum())})
    # ts ffill count
    rows.append({"field": "ts", "impute_source": "ffill", "count": int(df["ts_was_imputed"].sum())})
    return pd.DataFrame(rows)


def main():
    parser = argparse.ArgumentParser(description="Clean daily transaction wide table: audit missingness, impute, and output artifacts.")
    parser.add_argument("--input", required=True, help="Input CSV path with columns [txn_id, ts, branch, channel, amount, fee]")
    parser.add_argument("--outdir", required=True, help="Output directory")
    parser.add_argument("--impute-stat", choices=["median", "mean"], default="median", help="Statistic used for amount/fee imputation")
    parser.add_argument("--encoding", default="utf-8", help="CSV encoding")
    parser.add_argument("--sep", default=",", help="CSV separator (default ',')")
    args = parser.parse_args()

    os.makedirs(args.outdir, exist_ok=True)

    # Load
    df_raw = pd.read_csv(args.input, encoding=args.encoding, sep=args.sep, dtype=str)
    # Preserve a working numeric/date copy
    df = df_raw.copy()

    # Pre-ingestion missingness audit (before any coercion/imputation)
    pre_ingestion_audit = compute_missingness(df, ["txn_id", "ts", "branch", "channel", "amount", "fee"])
    pre_ingestion_audit.to_csv(os.path.join(args.outdir, "audit_missingness_before.csv"), index=False)

    # Type coercion and validation
    df, original = ensure_dtypes(df)

    # Impute amount/fee
    df = impute_amount_fee(df, stat=args.impute_stat)

    # Forward fill ts within (branch, channel)
    df = impute_ts_ffill(df)

    # Build comparison table (before vs after) for changed values
    before_after = build_before_after(df)
    before_after.to_csv(os.path.join(args.outdir, "before_after_changes.csv"), index=False)

    # Anomalies
    anomalies = build_anomalies(df, original)
    anomalies.to_csv(os.path.join(args.outdir, "anomalies.csv"), index=False)

    # Imputation summary
    summary = imputation_summary(df)
    summary.to_csv(os.path.join(args.outdir, "imputation_summary.csv"), index=False)

    # Cleaned dataset (with filled values and explicit lineage fields)
    cleaned = df.copy()
    cleaned["amount"] = cleaned["amount_filled"]
    cleaned["fee"] = cleaned["fee_filled"]
    cleaned["ts"] = cleaned["ts_filled"]
    # Keep key lineage columns; drop helper columns not needed downstream
    drop_cols = [
        "g_bc_amount", "g_bc_fee", "g_b_amount", "g_b_fee", "g_c_amount", "g_c_fee",
        "amount_filled", "fee_filled", "ts_filled",
    ]
    cleaned = cleaned.drop(columns=[c for c in drop_cols if c in cleaned.columns])
    cleaned.to_csv(os.path.join(args.outdir, "transactions_cleaned.csv"), index=False)

    # Post-processing missingness audit for verification of consistency
    post_audit = compute_missingness(cleaned, ["txn_id", "ts", "branch", "channel", "amount", "fee"])
    post_audit.to_csv(os.path.join(args.outdir, "audit_missingness_after.csv"), index=False)

    # Basic stdout log
    print("Completed.")
    print(f"Rows: {len(df)}")
    print(f"Pre-ingestion missingness audit: {os.path.join(args.outdir, 'audit_missingness_before.csv')}")
    print(f"Before/After changes: {os.path.join(args.outdir, 'before_after_changes.csv')}")
    print(f"Anomalies: {os.path.join(args.outdir, 'anomalies.csv')}")
    print(f"Imputation summary: {os.path.join(args.outdir, 'imputation_summary.csv')}")
    print(f"Cleaned dataset: {os.path.join(args.outdir, 'transactions_cleaned.csv')}")
    print(f"Post-imputation missingness audit: {os.path.join(args.outdir, 'audit_missingness_after.csv')}")


if __name__ == "__main__":
    pd.set_option("future.no_silent_downcasting", True)
    main()

Notes and rationale:
- Grouped imputation order for amount and fee ensures within-group consistency: branch+channel → branch → channel → global. The default statistic is median for robustness; switch to mean via --impute-stat if needed.
- Timestamp forward fill is constrained within (branch, channel) groups. Initial missing ts in a group remain missing (no backfill), which are listed in anomalies if unresolved.
- The before/after comparison includes only fields that changed and records the imputation source (ffill for ts).
- Anomalies include duplicate txn_id, residual missing values after imputation, and rows where type coercion produced NaNs (indicating invalid source values).
- All files are written to the specified output directory to support auditability and consistent processing definitions.

示例3

下面提供一个带注释的 Python 脚本,完成以下步骤:
- 读取“教学运营表”数据并进行基本类型规范化
- 探查各列缺失值的分布(数量与占比)
- 分别试跑三种缺失值填补策略:均值/众数、纯众数、线性插值
- 对比各策略对关键统计指标的影响(ARPU均值/方差、计划分布、流失率、登录间隔中位数)
- 输出每种策略填补了多少缺失单元格,以及与基线的差异

说明:
- 假设数据列为:[user_id, signup_day, last_login, plan, arpu, churn_flag]
- 日期插值通过将日期转为整数时间戳后按时间排序线性插值(仅用于演示,跨用户行的日期插值在业务上需谨慎)
- 分类变量不适合线性插值,选用前向/后向填充与众数兜底
- churn_flag 期望为二值 {0,1}

请按需修改 INPUT_PATH 的文件路径。

代码如下:

```python
import pandas as pd
import numpy as np

# =========================
# 配置
# =========================
INPUT_PATH = "path/to/jiaoxue_yunying.csv"  # 修改为实际路径
OUTPUT_PREFIX = "imputed_teaching_ops"      # 输出文件名前缀,可按需修改
pd.set_option("display.width", 120)
pd.set_option("display.max_columns", 20)

# =========================
# 工具函数
# =========================
def coerce_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    """
    规范各列数据类型:
    - user_id 作为标识符:转为字符串;若缺失则无法定位用户,建议剔除
    - signup_day/last_login:转为 datetime64[ns]
    - plan:保留为字符串/分类
    - arpu:转为浮点数
    - churn_flag:转为 pandas 可空整型 Int64,仅接受 {0,1}
    """
    df = df.copy()

    # 标识符:转为字符串
    if "user_id" in df.columns:
        df["user_id"] = df["user_id"].astype(str).str.strip()
        # 空字符串视为缺失
        df.loc[df["user_id"].isin(["", "nan", "None"]), "user_id"] = np.nan

    # 日期列
    for col in ["signup_day", "last_login"]:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors="coerce")

    # 分类列
    if "plan" in df.columns:
        df["plan"] = df["plan"].astype(str).str.strip()
        df.loc[df["plan"].isin(["", "nan", "None"]), "plan"] = np.nan

    # 数值列
    if "arpu" in df.columns:
        df["arpu"] = pd.to_numeric(df["arpu"], errors="coerce")

    # 二值标记:仅接受 {0,1}
    if "churn_flag" in df.columns:
        s = df["churn_flag"]
        s = s.replace({
            True: 1, False: 0,
            "True": 1, "False": 0,
            "true": 1, "false": 0,
            "1": 1, "0": 0
        })
        s = pd.to_numeric(s, errors="coerce")
        s = s.where(s.isin([0, 1]), np.nan)
        df["churn_flag"] = s.astype("Int64")

    # 缺失 user_id 的行剔除(无法关联用户)
    if "user_id" in df.columns:
        before = len(df)
        df = df[~df["user_id"].isna()].copy()
        after = len(df)
        if before != after:
            print(f"已剔除缺失 user_id 的行数:{before - after}")

    return df


def missing_overview(df: pd.DataFrame) -> pd.DataFrame:
    """
    返回缺失值概览(数量与占比)。
    """
    na_count = df.isna().sum()
    na_pct = df.isna().mean().round(4)
    overview = pd.DataFrame({"missing_count": na_count, "missing_pct": na_pct})
    return overview


def series_mode_safe(s: pd.Series):
    """
    安全众数:若存在多个众数,取第一个;若无众数(如全唯一),对数值列回退为中位数,对非数值列保持缺失。
    """
    modes = s.mode(dropna=True)
    if len(modes) > 0:
        return modes.iloc[0]
    # 众数不存在时的回退策略
    if pd.api.types.is_numeric_dtype(s):
        return s.median(skipna=True)
    else:
        return np.nan  # 非数值列不强行回退


def interpolate_datetime_globally(s: pd.Series) -> pd.Series:
    """
    对 datetime 列进行线性插值(演示用):
    - 将时间转为 int64(纳秒),将 NaT 映射为 NaN
    - 按时间值排序后线性插值,再映射回原索引
    注意:跨用户的全局插值会引入时间结构假设,仅用于缺失很少的演示;业务上建议基于同一主体或时间序列。
    """
    s = s.copy()
    # 转时间戳(纳秒),NaT 映射为 NaN
    ts = s.astype("int64")
    ts = ts.where(~s.isna(), np.nan)

    # 按值排序进行插值
    order_idx = s.sort_values().index  # NaT 自动排在最后
    ts_sorted = ts.loc[order_idx]
    ts_interp_sorted = ts_sorted.interpolate(method="linear", limit_direction="both")

    # 映射回原索引顺序
    ts_interp = ts_interp_sorted.reindex(s.index)

    # 转回 datetime
    s_interp = pd.to_datetime(ts_interp, unit="ns")
    return s_interp


def compute_metrics(df: pd.DataFrame) -> dict:
    """
    计算用于对比的关键指标:
    - arpu_mean, arpu_std
    - churn_rate(churn_flag 的均值)
    - plan_dist(计划的归一化分布,取前若干项)
    - median_login_delay_days(last_login - signup_day 的天数中位数)
    """
    metrics = {}
    # ARPU
    if "arpu" in df.columns:
        metrics["arpu_mean"] = df["arpu"].mean(skipna=True)
        metrics["arpu_std"] = df["arpu"].std(skipna=True)

    # 流失率
    if "churn_flag" in df.columns:
        # 可空整型的均值默认跳过缺失
        metrics["churn_rate"] = df["churn_flag"].mean(skipna=True)

    # 计划分布(归一化)
    if "plan" in df.columns:
        plan_dist = df["plan"].value_counts(normalize=True, dropna=True)
        # 转为字典便于打印
        metrics["plan_dist"] = plan_dist.to_dict()

    # 登录间隔(天)中位数
    if set(["signup_day", "last_login"]).issubset(df.columns):
        mask = df["signup_day"].notna() & df["last_login"].notna()
        if mask.any():
            deltas = (df.loc[mask, "last_login"] - df.loc[mask, "signup_day"]).dt.days
            metrics["median_login_delay_days"] = float(deltas.median())
        else:
            metrics["median_login_delay_days"] = np.nan

    return metrics


def print_metrics_diff(title: str, base: dict, new: dict):
    """
    打印两组指标的差异。
    """
    print(f"\n=== {title} 指标对比(与基线差异)===")
    keys = sorted(set(base.keys()) | set(new.keys()))
    for k in keys:
        if k == "plan_dist":
            print(f"[{k}]")
            base_dist = base.get(k, {})
            new_dist = new.get(k, {})
            all_plans = sorted(set(base_dist.keys()) | set(new_dist.keys()))
            for p in all_plans:
                b = base_dist.get(p, 0.0)
                n = new_dist.get(p, 0.0)
                diff = n - b
                print(f"  - {p}: 基线={b:.4f}, 新值={n:.4f}, 差异={diff:+.4f}")
        else:
            b = base.get(k, np.nan)
            n = new.get(k, np.nan)
            if pd.notna(b) and pd.notna(n):
                diff = n - b
                print(f"[{k}] 基线={b:.6f}, 新值={n:.6f}, 差异={diff:+.6f}")
            else:
                print(f"[{k}] 基线={b}, 新值={n}, 差异=不可计算")


# =========================
# 缺失填补策略
# =========================
def impute_mean_mode(df: pd.DataFrame) -> pd.DataFrame:
    """
    策略1:数值列用均值;分类/日期用众数。
    """
    out = df.copy()

    # 数值列:arpu -> 均值
    if "arpu" in out.columns:
        mean_val = out["arpu"].mean(skipna=True)
        out["arpu"] = out["arpu"].fillna(mean_val)

    # 二值列:churn_flag -> 众数
    if "churn_flag" in out.columns:
        mode_val = series_mode_safe(out["churn_flag"])
        out["churn_flag"] = out["churn_flag"].fillna(mode_val)

    # 分类列:plan -> 众数
    if "plan" in out.columns:
        mode_val = series_mode_safe(out["plan"])
        out["plan"] = out["plan"].fillna(mode_val)

    # 日期列:用众数(最常见日期)
    for col in ["signup_day", "last_login"]:
        if col in out.columns:
            mode_val = series_mode_safe(out[col])
            out[col] = out[col].fillna(mode_val)

    return out


def impute_mode_only(df: pd.DataFrame) -> pd.DataFrame:
    """
    策略2:所有列统一用众数(数值列也用众数;众数缺失则回退为中位数/保持缺失)。
    """
    out = df.copy()
    for col in out.columns:
        mode_val = series_mode_safe(out[col])
        out[col] = out[col].fillna(mode_val)
    return out


def impute_linear_interpolation(df: pd.DataFrame) -> pd.DataFrame:
    """
    策略3:连续型列用线性插值,离散型列用前后填充+众数兜底。
    - arpu:线性插值
    - signup_day/last_login:转换为时间戳后线性插值(演示用)
    - plan/churn_flag:先前向填充,再后向填充,最后用众数兜底
    """
    out = df.copy()

    # 连续型:ARPU 线性插值(按原索引顺序)
    if "arpu" in out.columns:
        out["arpu"] = out["arpu"].interpolate(method="linear", limit_direction="both")

    # 日期列:全局时间序列线性插值(演示用,谨慎使用)
    for col in ["signup_day", "last_login"]:
        if col in out.columns:
            out[col] = interpolate_datetime_globally(out[col])

    # 离散型:先 ffill/bfill 后众数兜底
    for col in ["plan", "churn_flag"]:
        if col in out.columns:
            out[col] = out[col].ffill().bfill()
            mode_val = series_mode_safe(out[col])
            out[col] = out[col].fillna(mode_val)

    return out


def count_filled_cells(original: pd.DataFrame, imputed: pd.DataFrame) -> pd.Series:
    """
    统计各列本次填补的单元格数量(原始缺失 - 填补后缺失)。
    """
    return original.isna().sum() - imputed.isna().sum()


# =========================
# 主流程
# =========================
def main():
    # 读取数据
    df = pd.read_csv(INPUT_PATH)

    # 类型规范化与基本清洗
    df = coerce_dtypes(df)

    # 检查列是否齐全
    required_cols = ["user_id", "signup_day", "last_login", "plan", "arpu", "churn_flag"]
    missing_cols = [c for c in required_cols if c not in df.columns]
    if missing_cols:
        raise ValueError(f"缺失必要列:{missing_cols}")

    # 缺失概览
    print("\n=== 缺失值概览(清洗后原始数据)===")
    overview = missing_overview(df)
    print(overview)

    # 基线指标(不做填补,直接按现有非缺失数据计算)
    base_metrics = compute_metrics(df)
    print("\n=== 基线指标(按非缺失计算)===")
    for k, v in base_metrics.items():
        if k == "plan_dist":
            print(f"[{k}] {v}")
        else:
            print(f"[{k}] {v}")

    # 策略1:均值/众数
    df_mean_mode = impute_mean_mode(df)
    mm_metrics = compute_metrics(df_mean_mode)
    mm_filled = count_filled_cells(df, df_mean_mode)
    print("\n=== 策略1:均值/众数 填补量(各列)===")
    print(mm_filled)
    print_metrics_diff("策略1:均值/众数", base_metrics, mm_metrics)

    # 策略2:纯众数
    df_mode = impute_mode_only(df)
    mode_metrics = compute_metrics(df_mode)
    mode_filled = count_filled_cells(df, df_mode)
    print("\n=== 策略2:纯众数 填补量(各列)===")
    print(mode_filled)
    print_metrics_diff("策略2:纯众数", base_metrics, mode_metrics)

    # 策略3:线性插值
    df_interp = impute_linear_interpolation(df)
    interp_metrics = compute_metrics(df_interp)
    interp_filled = count_filled_cells(df, df_interp)
    print("\n=== 策略3:线性插值 填补量(各列)===")
    print(interp_filled)
    print_metrics_diff("策略3:线性插值", base_metrics, interp_metrics)

    # 可选:保存结果
    df_mean_mode.to_csv(f"{OUTPUT_PREFIX}_mean_mode.csv", index=False)
    df_mode.to_csv(f"{OUTPUT_PREFIX}_mode.csv", index=False)
    df_interp.to_csv(f"{OUTPUT_PREFIX}_interpolate.csv", index=False)
    print("\n已输出三种策略的填补结果到当前目录。")


if __name__ == "__main__":
    main()
```

使用与解释要点:
- 先进行类型规范化,确保数值、日期、分类、二值列符合预期类型;剔除缺失 user_id 的记录避免无法关联分析。
- 缺失侦测使用 missing_overview 输出每列缺失数与占比。
- 三种方案说明:
  - 均值/众数:对连续变量(ARPU)使用均值,对离散或日期使用众数,适合缺失比例很低且分布稳定的场景。
  - 纯众数:所有列统一众数填补,简单但可能引入分布偏移,数值列众数在多峰或全唯一时用中位数回退。
  - 线性插值:连续变量按序线性插值;日期通过时间戳全局排序插值,仅供演示,业务落地需基于同主体或时间序列维度;离散变量用前后填充并以众数兜底。
- 指标对比从分布和业务关键量(ARPU均值/方差、计划分布、流失率、登录间隔中位数)评估填补影响,便于选择合适策略。

适用用户

数据分析师

为电商、金融或运营数据一键生成缺失值清洗脚本,快速试用多种填补方案,沉淀可复用模板,提升建模与报表的可信度。

BI工程师

在入库前完成缺失率审计与自动填补,输出对照表和异常清单,保障看板口径一致,减少手工修补时间。

数据科学初学者

通过带注释的脚本理解常见填补方法,安全试运行并查看影响,快速从入门到上手项目实战。

产品经理/增长分析

无需深度编码,描述数据特征即可获取可运行脚本,验证业务假设,缩短分析与迭代周期。

运营策略分析师

批量清理活动与用户数据缺失,保持样本量稳定,快速产出清洗前后指标对比,支撑A/B与投放决策。

科研人员

针对不同缺失机制生成合适方案,保留实验严谨性,完整记录每一步设置与理由,便于论文复现。

数据工程师

将生成脚本嵌入数据加工链路,减少手写与维护成本,降低误清洗风险,提升整体数据质量。

解决的问题

以“缺失值清洗”为切入点,为数据分析、算法与BI团队提供一条从需求到可运行脚本的快速通道:让AI扮演数据挖掘专家,基于你提供的数据集特征与业务约束,稳定产出结构清晰、可直接嵌入流水线的Python清洗脚本;统一处理口径,降低人为差错,缩短数据准备周期,提升模型训练与报表输出的准确性,并支持多语言回应以便跨团队协作和交付。

特征总结

基于数据特征,一键生成定制化Python清洗脚本,专注缺失值处理并可复用。
自动识别字段类型与异常分布,推荐均值、众数、插值等最合适填补策略。
支持分组填充与多方案对比,快速查看不同策略对指标与样本量的影响。
自动生成可读注释与使用说明,新人也能理解每步处理原因与替代方案。
一键运行与结果校验,输出缺失率对照表与清洗前后对比,便于复盘汇报。
可按业务字段定制规则,灵活跳过关键列或常量列,降低误清洗风险显著。
支持多语言说明与本地化注释,跨地域团队协作与交接更顺畅无障碍化。
脚本结构清晰可扩展,后续可无缝加入标准化、异常值处理等深度流程。
内置质量检查与失败提醒,及时暴露无法填补或信息不足的列及原因详情。
与常见分析环境天然兼容,便捷接入现有笔记本与任务流,减少切换成本。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥15.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 256 tokens
- 2 个可调节参数
{ 数据集特征 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59