生成用于清理指定数据集缺失值的Python脚本。
以下提供一个可复用的Python脚本,用于电商销售数据的缺失值清理与多方案填补。脚本支持一键生成多种填补方案:均值、中位数、众数、分组、回归。针对本数据集的特点,脚本处理了qty/price的“0即缺失”混用、age缺失较多和payment_method少量空白的问题,且提供分组与回归填补的工程化模板,方便在其他数据集复用。 脚本说明 - 输入列假设为:[id, order_date, sku, qty, price, discount, customer_id, city, age, payment_method] - 缺失标准化:空字符串、全空白字符串、常见缺失标记统一为NaN;可选将qty/price中的0视为缺失。 - 多方案填补: - 均值:数值列用均值填补;类别列用众数。 - 中位数:数值列用中位数填补;类别列用众数。 - 众数:数值列、类别列均用众数。 - 分组:qty/price按sku的组内中位数;age按city的组内中位数;payment_method按city的组内众数;并设全局回落值。 - 回归:对age/price/qty构建独立的回归模型(RandomForestRegressor),使用订单时间特征、sku/city/payment_method等特征进行预测;payment_method仍用众数填补。 - 输出:每种方案生成一个独立CSV。 - 可配置项:零视为缺失的列、策略选择、输出目录等。 代码 请将以下内容保存为impute_sales.py,并安装依赖:pip install pandas numpy scikit-learn ```python # impute_sales.py # -*- coding: utf-8 -*- """ 电商销售表缺失值清理与多方案填补脚本 支持方案:均值/中位数/众数/分组/回归 """ import argparse import os from typing import Dict, List import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.ensemble import RandomForestRegressor from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder DEFAULT_NA_VALUES = ["", " ", " ", "NA", "N/A", "NULL", "null", "None", "none"] def parse_args(): parser = argparse.ArgumentParser(description="电商销售表缺失值清理与多方案填补") parser.add_argument("--input", type=str, required=True, help="输入CSV文件路径") parser.add_argument("--output-dir", type=str, required=True, help="输出目录") parser.add_argument( "--strategies", type=str, default="all", help="填补策略,逗号分隔:mean,median,mode,group,regression 或 all", ) parser.add_argument( "--zero-as-missing", type=str, default="qty,price", help="将0视为缺失的列,逗号分隔;为空表示不启用", ) parser.add_argument( "--random-state", type=int, default=42, help="随机种子(回归模型)", ) return parser.parse_args() def load_data(path: str, na_values: List[str]) -> pd.DataFrame: df = pd.read_csv(path, na_values=na_values, dtype=str) # 去除首尾空白以统一缺失识别 for col in df.columns: df[col] = df[col].astype(str).str.strip() df.loc[df[col].isin(["", " "]), col] = np.nan return df def convert_types(df: pd.DataFrame) -> pd.DataFrame: # 类型转换,尽量严格但避免抛异常 out = df.copy() # 日期 out["order_date"] = pd.to_datetime(out["order_date"], errors="coerce") # 数值列 for col in ["qty", "price", "discount", "age"]: out[col] = pd.to_numeric(out[col], errors="coerce") # 标识列保留字符串 # 类别列保持字符串(可能含NaN) return out def standardize_missing(df: pd.DataFrame, zero_as_missing_cols: List[str]) -> pd.DataFrame: out = df.copy() # 0视为缺失(可选) for col in zero_as_missing_cols: if col in out.columns and pd.api.types.is_numeric_dtype(out[col]): out.loc[out[col] == 0, col] = np.nan return out def add_time_features(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() out["order_year"] = out["order_date"].dt.year out["order_month"] = out["order_date"].dt.month out["order_weekday"] = out["order_date"].dt.weekday return out def fill_categorical_mode(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame: out = df.copy() for col in cols: if col in out.columns: mode_vals = out[col].mode(dropna=True) if len(mode_vals) > 0: out[col] = out[col].fillna(mode_vals.iloc[0]) return out def impute_mean(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() # 类别列:众数 out = fill_categorical_mode(out, ["sku", "city", "payment_method"]) # 数值列:均值 for col in ["qty", "price", "discount", "age"]: if col in out.columns: mean_val = out[col].mean(skipna=True) out[col] = out[col].fillna(mean_val) return out def impute_median(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() out = fill_categorical_mode(out, ["sku", "city", "payment_method"]) for col in ["qty", "price", "discount", "age"]: if col in out.columns: med_val = out[col].median(skipna=True) out[col] = out[col].fillna(med_val) return out def impute_mode(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() # 所有列用众数(数值列众数可能不唯一,选第一个) for col in out.columns: mode_vals = out[col].mode(dropna=True) if len(mode_vals) > 0: out[col] = out[col].fillna(mode_vals.iloc[0]) return out def impute_group(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() # payment_method按city众数 if "payment_method" in out.columns and "city" in out.columns: out["payment_method"] = ( out.groupby("city")["payment_method"] .transform(lambda s: s.fillna(s.mode().iloc[0] if len(s.mode()) > 0 else np.nan)) ) # 全局回落 pm_mode = out["payment_method"].mode(dropna=True) if len(pm_mode) > 0: out["payment_method"] = out["payment_method"].fillna(pm_mode.iloc[0]) # qty/price按sku中位数;全局回落用中位数 for col in ["qty", "price"]: if col in out.columns and "sku" in out.columns: out[col] = out.groupby("sku")[col].transform(lambda s: s.fillna(s.median())) global_med = out[col].median(skipna=True) out[col] = out[col].fillna(global_med) # age按city中位数;全局回落用中位数 if "age" in out.columns and "city" in out.columns: out["age"] = out.groupby("city")["age"].transform(lambda s: s.fillna(s.median())) age_med = out["age"].median(skipna=True) out["age"] = out["age"].fillna(age_med) # discount若有缺失,统一用中位数(未指定分组) if "discount" in out.columns: out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True)) return out def build_regression_imputer( df: pd.DataFrame, target: str, random_state: int = 42, ) -> Pipeline: """ 为目标列构建回归填补管道。 数值特征:median填补;类别特征:众数填补+OneHot。 模型:随机森林回归(对非线性和类别特征较稳健)。 """ # 特征列定义:排除标识列和目标列 # 使用订单时间特征和其他可解释变量 time_features = ["order_year", "order_month", "order_weekday"] cat_features = [c for c in ["sku", "city", "payment_method"] if c in df.columns] num_features_all = [c for c in ["qty", "price", "discount", "age"] if c in df.columns] num_features = [c for c in num_features_all if c != target] # 组装列转换 transformers = [] if len(num_features) > 0: transformers.append(("num", SimpleImputer(strategy="median"), num_features)) if len(cat_features) > 0: transformers.append( ( "cat", Pipeline( steps=[ ("imputer", SimpleImputer(strategy="most_frequent")), ("ohe", OneHotEncoder(handle_unknown="ignore")), ] ), cat_features, ) ) if len(time_features) > 0: transformers.append(("time", SimpleImputer(strategy="median"), time_features)) preprocessor = ColumnTransformer(transformers=transformers, remainder="drop") model = RandomForestRegressor( n_estimators=200, max_depth=None, random_state=random_state, n_jobs=-1, ) pipe = Pipeline(steps=[("preprocess", preprocessor), ("model", model)]) return pipe def impute_regression(df: pd.DataFrame, random_state: int = 42) -> pd.DataFrame: out = df.copy() # 类别填补优先处理:payment_method用众数,避免建模特征中缺失 out = fill_categorical_mode(out, ["payment_method", "sku", "city"]) # 时间特征 out = add_time_features(out) # 针对discount的简单填补,避免作为特征时大量缺失 if "discount" in out.columns: out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True)) targets = [c for c in ["age", "price", "qty"] if c in out.columns] for target in targets: # 构建并训练回归模型 pipe = build_regression_imputer(out, target=target, random_state=random_state) mask_train = out[target].notna() mask_pred = out[target].isna() if mask_train.sum() == 0 or mask_pred.sum() == 0: # 无需训练或无待填补 continue X_train = out.loc[mask_train] y_train = out.loc[mask_train, target] X_pred = out.loc[mask_pred] pipe.fit(X_train, y_train) y_hat = pipe.predict(X_pred) # 合理边界控制:非负;age限定在0-100之间(如需调整可修改) if target in ["qty", "price"]: y_hat = np.maximum(y_hat, 0.0) if target == "age": y_hat = np.clip(y_hat, 0.0, 100.0) out.loc[mask_pred, target] = y_hat return out def generate_outputs(df: pd.DataFrame, strategies: List[str], out_dir: str, random_state: int): os.makedirs(out_dir, exist_ok=True) results: Dict[str, pd.DataFrame] = {} if "mean" in strategies: results["mean"] = impute_mean(df) if "median" in strategies: results["median"] = impute_median(df) if "mode" in strategies: results["mode"] = impute_mode(df) if "group" in strategies: results["group"] = impute_group(df) if "regression" in strategies: results["regression"] = impute_regression(df, random_state=random_state) # 输出保存 for name, dfi in results.items(): out_path = os.path.join(out_dir, f"{name}_imputed.csv") dfi.to_csv(out_path, index=False) # 缺失情况报告 miss_report = dfi.isna().sum() print(f"[{name}] 保存至: {out_path}") print(f"[{name}] 每列剩余缺失数:") print(miss_report.to_string()) print("-" * 60) return results def main(): args = parse_args() strategies = ( ["mean", "median", "mode", "group", "regression"] if args.strategies.strip().lower() == "all" else [s.strip().lower() for s in args.strategies.split(",")] ) zero_cols = [c.strip() for c in args.zero_as_missing.split(",") if c.strip()] # 读取与标准化 df = load_data(args.input, na_values=DEFAULT_NA_VALUES) df = convert_types(df) df = standardize_missing(df, zero_as_missing_cols=zero_cols) # 一键生成多方案填补 generate_outputs(df, strategies=strategies, out_dir=args.output_dir, random_state=args.random_state) if __name__ == "__main__": main() ``` 使用示例 - 一键生成全部方案(默认将qty和price的0视为缺失): - python impute_sales.py --input sales.csv --output-dir clean/ --strategies all - 仅生成分组与回归方案: - python impute_sales.py --input sales.csv --output-dir clean/ --strategies group,regression - 不将0视为缺失: - python impute_sales.py --input sales.csv --output-dir clean/ --zero-as-missing "" 实现要点与专业建议 - qty/price的“0与缺失混用”通过可配置的zero_as_missing处理,避免将免费商品或业务有效0误判为缺失。请依据业务规则决定是否启用。 - 分组填补使用组内统计量并设置全局回落值,保证小组样本不足时的鲁棒性。当前默认:qty/price按sku中位数,age/payment_method按city;可依据实际业务结构调整分组键。 - 回归填补为数值列构建独立模型,包含时间特征与类别OneHot编码,支持在缺失比例较高时提升填补质量。模型选择随机森林回归,兼顾非线性与类别处理;如需更高精度,可替换为梯度提升、XGBoost等。 - 类别列(payment_method)在本场景缺失较少,采用众数填补更稳定且成本低。若未来需要预测类别,可扩展分类模型填补。 - 建议在生成的多方案结果中进行交叉验证与离线评估(例如对有标签的样本进行保留集评估),选择最符合业务指标的方案用于生产。 该脚本作为模板可在其他表结构上复用。对于不同数据集,仅需: - 调整列类型识别与特征工程; - 配置分组键与模型特征; - 根据业务规则设置zero_as_missing与边界裁剪。
Below is a self-contained Python script that performs a pre-ingestion missingness audit, imputes missing values according to the specified rules, generates a before/after comparison for changed fields, and outputs an anomalies list. It is designed for the daily transaction wide table with columns [txn_id, ts, branch, channel, amount, fee]. Imputation rules: - amount and fee: group-wise imputation using branch + channel first, then branch-only, then channel-only, then global; default statistic is median (configurable). - ts: forward-fill within each (branch, channel) group after sorting by ts ascending (no backfill). This script also enforces consistent definitions and tracking: - All imputations are sourced from non-missing values only. - Imputation sources are recorded per field. - Pre-ingestion missingness audit is persisted. - Residual missing and duplicate txn_id records are flagged as anomalies. You can run: python clean_transactions.py --input /path/to/input.csv --outdir ./out --impute-stat median Code: #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import os from typing import Tuple, Dict import pandas as pd import numpy as np def compute_missingness(df: pd.DataFrame, cols: list) -> pd.DataFrame: total = len(df) audit = [] for c in cols: nulls = df[c].isna().sum() audit.append({"column": c, "null_count": int(nulls), "total": int(total), "null_rate": float(nulls / total if total else np.nan)}) return pd.DataFrame(audit) def ensure_dtypes(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, pd.Series]]: original = {} for col in ["txn_id", "ts", "branch", "channel", "amount", "fee"]: if col not in df.columns: raise ValueError(f"Missing required column: {col}") # Preserve originals for anomaly detection for c in ["ts", "amount", "fee"]: original[f"orig_{c}"] = df[c].copy() # Coerce types df["txn_id"] = df["txn_id"].astype(str) df["branch"] = df["branch"].astype(str) df["channel"] = df["channel"].astype(str) df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=False) df["amount"] = pd.to_numeric(df["amount"], errors="coerce") df["fee"] = pd.to_numeric(df["fee"], errors="coerce") return df, original def build_group_stats(df: pd.DataFrame, stat: str = "median") -> Dict[str, pd.DataFrame]: agg_func = np.median if stat == "median" else np.mean use_cols = ["amount", "fee"] g_bc = df.groupby(["branch", "channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={ "amount": "g_bc_amount", "fee": "g_bc_fee" }) g_b = df.groupby(["branch"], dropna=False)[use_cols].agg(agg_func).rename(columns={ "amount": "g_b_amount", "fee": "g_b_fee" }) g_c = df.groupby(["channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={ "amount": "g_c_amount", "fee": "g_c_fee" }) g_all = df[use_cols].agg(agg_func).to_dict() # {"amount": x, "fee": y} return {"g_bc": g_bc, "g_b": g_b, "g_c": g_c, "g_all": g_all} def impute_amount_fee(df: pd.DataFrame, stat: str = "median") -> pd.DataFrame: stats = build_group_stats(df, stat=stat) df = df.merge(stats["g_bc"], how="left", left_on=["branch", "channel"], right_index=True) df = df.merge(stats["g_b"], how="left", on="branch") df = df.merge(stats["g_c"], how="left", on="channel") # Prepare targets df["amount_impute_source"] = None df["fee_impute_source"] = None # Amount amount_filled = df["amount"].copy() src = pd.Series(index=df.index, dtype=object) mask = amount_filled.isna() & df["g_bc_amount"].notna() amount_filled[mask] = df.loc[mask, "g_bc_amount"] src[mask] = "branch+channel" mask = amount_filled.isna() & df["g_b_amount"].notna() amount_filled[mask] = df.loc[mask, "g_b_amount"] src[mask] = "branch" mask = amount_filled.isna() & df["g_c_amount"].notna() amount_filled[mask] = df.loc[mask, "g_c_amount"] src[mask] = "channel" if np.isfinite(stats["g_all"]["amount"]) and not pd.isna(stats["g_all"]["amount"]): mask = amount_filled.isna() amount_filled[mask] = stats["g_all"]["amount"] src[mask] = "global" df["amount_impute_source"] = src.where(df["amount"].isna(), None) df["amount_filled"] = amount_filled df["amount_was_imputed"] = df["amount"].isna() & df["amount_filled"].notna() # Fee fee_filled = df["fee"].copy() src = pd.Series(index=df.index, dtype=object) mask = fee_filled.isna() & df["g_bc_fee"].notna() fee_filled[mask] = df.loc[mask, "g_bc_fee"] src[mask] = "branch+channel" mask = fee_filled.isna() & df["g_b_fee"].notna() fee_filled[mask] = df.loc[mask, "g_b_fee"] src[mask] = "branch" mask = fee_filled.isna() & df["g_c_fee"].notna() fee_filled[mask] = df.loc[mask, "g_c_fee"] src[mask] = "channel" if np.isfinite(stats["g_all"]["fee"]) and not pd.isna(stats["g_all"]["fee"]): mask = fee_filled.isna() fee_filled[mask] = stats["g_all"]["fee"] src[mask] = "global" df["fee_impute_source"] = src.where(df["fee"].isna(), None) df["fee_filled"] = fee_filled df["fee_was_imputed"] = df["fee"].isna() & df["fee_filled"].notna() return df def impute_ts_ffill(df: pd.DataFrame) -> pd.DataFrame: # Forward fill within (branch, channel), sorted by ts ascending, then txn_id for deterministic order # Note: initial missing ts in a group remain missing (no backfill). order_cols = ["branch", "channel", "ts", "txn_id"] df_sorted = df.sort_values(order_cols, na_position="first").copy() ts_ffilled = df_sorted.groupby(["branch", "channel"], dropna=False)["ts"].ffill() df_sorted["ts_filled"] = ts_ffilled # Restore original index alignment df["ts_filled"] = df_sorted.loc[df_sorted.index, "ts_filled"].sort_index() df["ts_was_imputed"] = df["ts"].isna() & df["ts_filled"].notna() return df def build_before_after(df: pd.DataFrame) -> pd.DataFrame: changes = [] for col, filled_col, src_col in [ ("amount", "amount_filled", "amount_impute_source"), ("fee", "fee_filled", "fee_impute_source"), ("ts", "ts_filled", None), ]: diff_mask = ~df[col].eq(df[filled_col]) # handles NaN vs value etc. if diff_mask.any(): tmp = pd.DataFrame({ "txn_id": df.loc[diff_mask, "txn_id"].astype(str), "field": col, "before": df.loc[diff_mask, col].astype(str), "after": df.loc[diff_mask, filled_col].astype(str), }) if src_col: tmp["impute_source"] = df.loc[diff_mask, src_col].astype(str) else: tmp["impute_source"] = "ffill" changes.append(tmp) if changes: return pd.concat(changes, ignore_index=True) else: return pd.DataFrame(columns=["txn_id", "field", "before", "after", "impute_source"]) def build_anomalies(df: pd.DataFrame, original: Dict[str, pd.Series]) -> pd.DataFrame: anomalies = [] # Duplicated txn_id dups = df[df["txn_id"].duplicated(keep=False)].copy() if not dups.empty: dups["anomaly_type"] = "duplicate_txn_id" anomalies.append(dups[["txn_id", "anomaly_type"]]) # Rows still missing after imputation in critical fields still_missing = df[df["ts_filled"].isna() | df["amount_filled"].isna() | df["fee_filled"].isna()].copy() if not still_missing.empty: still_missing["anomaly_type"] = "residual_missing_after_imputation" anomalies.append(still_missing[["txn_id", "anomaly_type"]]) # Coercion-induced NaNs for ts/amount/fee for col, orig_col in [("ts", "orig_ts"), ("amount", "orig_amount"), ("fee", "orig_fee")]: coerced_nan_mask = original[orig_col].notna() & df[col].isna() if coerced_nan_mask.any(): rows = df.loc[coerced_nan_mask, ["txn_id"]].copy() rows["anomaly_type"] = f"type_coercion_nan_{col}" anomalies.append(rows) if anomalies: out = pd.concat(anomalies, ignore_index=True).drop_duplicates() return out else: return pd.DataFrame(columns=["txn_id", "anomaly_type"]) def imputation_summary(df: pd.DataFrame) -> pd.DataFrame: rows = [] for field, src_col, flag_col in [ ("amount", "amount_impute_source", "amount_was_imputed"), ("fee", "fee_impute_source", "fee_was_imputed"), ]: # Distribution of sources src_counts = df.loc[df[flag_col], src_col].value_counts(dropna=False) for src, cnt in src_counts.items(): rows.append({"field": field, "impute_source": src if src is not None else "none", "count": int(cnt)}) # Total imputed rows.append({"field": field, "impute_source": "TOTAL_IMPUTED", "count": int(df[flag_col].sum())}) # ts ffill count rows.append({"field": "ts", "impute_source": "ffill", "count": int(df["ts_was_imputed"].sum())}) return pd.DataFrame(rows) def main(): parser = argparse.ArgumentParser(description="Clean daily transaction wide table: audit missingness, impute, and output artifacts.") parser.add_argument("--input", required=True, help="Input CSV path with columns [txn_id, ts, branch, channel, amount, fee]") parser.add_argument("--outdir", required=True, help="Output directory") parser.add_argument("--impute-stat", choices=["median", "mean"], default="median", help="Statistic used for amount/fee imputation") parser.add_argument("--encoding", default="utf-8", help="CSV encoding") parser.add_argument("--sep", default=",", help="CSV separator (default ',')") args = parser.parse_args() os.makedirs(args.outdir, exist_ok=True) # Load df_raw = pd.read_csv(args.input, encoding=args.encoding, sep=args.sep, dtype=str) # Preserve a working numeric/date copy df = df_raw.copy() # Pre-ingestion missingness audit (before any coercion/imputation) pre_ingestion_audit = compute_missingness(df, ["txn_id", "ts", "branch", "channel", "amount", "fee"]) pre_ingestion_audit.to_csv(os.path.join(args.outdir, "audit_missingness_before.csv"), index=False) # Type coercion and validation df, original = ensure_dtypes(df) # Impute amount/fee df = impute_amount_fee(df, stat=args.impute_stat) # Forward fill ts within (branch, channel) df = impute_ts_ffill(df) # Build comparison table (before vs after) for changed values before_after = build_before_after(df) before_after.to_csv(os.path.join(args.outdir, "before_after_changes.csv"), index=False) # Anomalies anomalies = build_anomalies(df, original) anomalies.to_csv(os.path.join(args.outdir, "anomalies.csv"), index=False) # Imputation summary summary = imputation_summary(df) summary.to_csv(os.path.join(args.outdir, "imputation_summary.csv"), index=False) # Cleaned dataset (with filled values and explicit lineage fields) cleaned = df.copy() cleaned["amount"] = cleaned["amount_filled"] cleaned["fee"] = cleaned["fee_filled"] cleaned["ts"] = cleaned["ts_filled"] # Keep key lineage columns; drop helper columns not needed downstream drop_cols = [ "g_bc_amount", "g_bc_fee", "g_b_amount", "g_b_fee", "g_c_amount", "g_c_fee", "amount_filled", "fee_filled", "ts_filled", ] cleaned = cleaned.drop(columns=[c for c in drop_cols if c in cleaned.columns]) cleaned.to_csv(os.path.join(args.outdir, "transactions_cleaned.csv"), index=False) # Post-processing missingness audit for verification of consistency post_audit = compute_missingness(cleaned, ["txn_id", "ts", "branch", "channel", "amount", "fee"]) post_audit.to_csv(os.path.join(args.outdir, "audit_missingness_after.csv"), index=False) # Basic stdout log print("Completed.") print(f"Rows: {len(df)}") print(f"Pre-ingestion missingness audit: {os.path.join(args.outdir, 'audit_missingness_before.csv')}") print(f"Before/After changes: {os.path.join(args.outdir, 'before_after_changes.csv')}") print(f"Anomalies: {os.path.join(args.outdir, 'anomalies.csv')}") print(f"Imputation summary: {os.path.join(args.outdir, 'imputation_summary.csv')}") print(f"Cleaned dataset: {os.path.join(args.outdir, 'transactions_cleaned.csv')}") print(f"Post-imputation missingness audit: {os.path.join(args.outdir, 'audit_missingness_after.csv')}") if __name__ == "__main__": pd.set_option("future.no_silent_downcasting", True) main() Notes and rationale: - Grouped imputation order for amount and fee ensures within-group consistency: branch+channel → branch → channel → global. The default statistic is median for robustness; switch to mean via --impute-stat if needed. - Timestamp forward fill is constrained within (branch, channel) groups. Initial missing ts in a group remain missing (no backfill), which are listed in anomalies if unresolved. - The before/after comparison includes only fields that changed and records the imputation source (ffill for ts). - Anomalies include duplicate txn_id, residual missing values after imputation, and rows where type coercion produced NaNs (indicating invalid source values). - All files are written to the specified output directory to support auditability and consistent processing definitions.
下面提供一个带注释的 Python 脚本,完成以下步骤: - 读取“教学运营表”数据并进行基本类型规范化 - 探查各列缺失值的分布(数量与占比) - 分别试跑三种缺失值填补策略:均值/众数、纯众数、线性插值 - 对比各策略对关键统计指标的影响(ARPU均值/方差、计划分布、流失率、登录间隔中位数) - 输出每种策略填补了多少缺失单元格,以及与基线的差异 说明: - 假设数据列为:[user_id, signup_day, last_login, plan, arpu, churn_flag] - 日期插值通过将日期转为整数时间戳后按时间排序线性插值(仅用于演示,跨用户行的日期插值在业务上需谨慎) - 分类变量不适合线性插值,选用前向/后向填充与众数兜底 - churn_flag 期望为二值 {0,1} 请按需修改 INPUT_PATH 的文件路径。 代码如下: ```python import pandas as pd import numpy as np # ========================= # 配置 # ========================= INPUT_PATH = "path/to/jiaoxue_yunying.csv" # 修改为实际路径 OUTPUT_PREFIX = "imputed_teaching_ops" # 输出文件名前缀,可按需修改 pd.set_option("display.width", 120) pd.set_option("display.max_columns", 20) # ========================= # 工具函数 # ========================= def coerce_dtypes(df: pd.DataFrame) -> pd.DataFrame: """ 规范各列数据类型: - user_id 作为标识符:转为字符串;若缺失则无法定位用户,建议剔除 - signup_day/last_login:转为 datetime64[ns] - plan:保留为字符串/分类 - arpu:转为浮点数 - churn_flag:转为 pandas 可空整型 Int64,仅接受 {0,1} """ df = df.copy() # 标识符:转为字符串 if "user_id" in df.columns: df["user_id"] = df["user_id"].astype(str).str.strip() # 空字符串视为缺失 df.loc[df["user_id"].isin(["", "nan", "None"]), "user_id"] = np.nan # 日期列 for col in ["signup_day", "last_login"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") # 分类列 if "plan" in df.columns: df["plan"] = df["plan"].astype(str).str.strip() df.loc[df["plan"].isin(["", "nan", "None"]), "plan"] = np.nan # 数值列 if "arpu" in df.columns: df["arpu"] = pd.to_numeric(df["arpu"], errors="coerce") # 二值标记:仅接受 {0,1} if "churn_flag" in df.columns: s = df["churn_flag"] s = s.replace({ True: 1, False: 0, "True": 1, "False": 0, "true": 1, "false": 0, "1": 1, "0": 0 }) s = pd.to_numeric(s, errors="coerce") s = s.where(s.isin([0, 1]), np.nan) df["churn_flag"] = s.astype("Int64") # 缺失 user_id 的行剔除(无法关联用户) if "user_id" in df.columns: before = len(df) df = df[~df["user_id"].isna()].copy() after = len(df) if before != after: print(f"已剔除缺失 user_id 的行数:{before - after}") return df def missing_overview(df: pd.DataFrame) -> pd.DataFrame: """ 返回缺失值概览(数量与占比)。 """ na_count = df.isna().sum() na_pct = df.isna().mean().round(4) overview = pd.DataFrame({"missing_count": na_count, "missing_pct": na_pct}) return overview def series_mode_safe(s: pd.Series): """ 安全众数:若存在多个众数,取第一个;若无众数(如全唯一),对数值列回退为中位数,对非数值列保持缺失。 """ modes = s.mode(dropna=True) if len(modes) > 0: return modes.iloc[0] # 众数不存在时的回退策略 if pd.api.types.is_numeric_dtype(s): return s.median(skipna=True) else: return np.nan # 非数值列不强行回退 def interpolate_datetime_globally(s: pd.Series) -> pd.Series: """ 对 datetime 列进行线性插值(演示用): - 将时间转为 int64(纳秒),将 NaT 映射为 NaN - 按时间值排序后线性插值,再映射回原索引 注意:跨用户的全局插值会引入时间结构假设,仅用于缺失很少的演示;业务上建议基于同一主体或时间序列。 """ s = s.copy() # 转时间戳(纳秒),NaT 映射为 NaN ts = s.astype("int64") ts = ts.where(~s.isna(), np.nan) # 按值排序进行插值 order_idx = s.sort_values().index # NaT 自动排在最后 ts_sorted = ts.loc[order_idx] ts_interp_sorted = ts_sorted.interpolate(method="linear", limit_direction="both") # 映射回原索引顺序 ts_interp = ts_interp_sorted.reindex(s.index) # 转回 datetime s_interp = pd.to_datetime(ts_interp, unit="ns") return s_interp def compute_metrics(df: pd.DataFrame) -> dict: """ 计算用于对比的关键指标: - arpu_mean, arpu_std - churn_rate(churn_flag 的均值) - plan_dist(计划的归一化分布,取前若干项) - median_login_delay_days(last_login - signup_day 的天数中位数) """ metrics = {} # ARPU if "arpu" in df.columns: metrics["arpu_mean"] = df["arpu"].mean(skipna=True) metrics["arpu_std"] = df["arpu"].std(skipna=True) # 流失率 if "churn_flag" in df.columns: # 可空整型的均值默认跳过缺失 metrics["churn_rate"] = df["churn_flag"].mean(skipna=True) # 计划分布(归一化) if "plan" in df.columns: plan_dist = df["plan"].value_counts(normalize=True, dropna=True) # 转为字典便于打印 metrics["plan_dist"] = plan_dist.to_dict() # 登录间隔(天)中位数 if set(["signup_day", "last_login"]).issubset(df.columns): mask = df["signup_day"].notna() & df["last_login"].notna() if mask.any(): deltas = (df.loc[mask, "last_login"] - df.loc[mask, "signup_day"]).dt.days metrics["median_login_delay_days"] = float(deltas.median()) else: metrics["median_login_delay_days"] = np.nan return metrics def print_metrics_diff(title: str, base: dict, new: dict): """ 打印两组指标的差异。 """ print(f"\n=== {title} 指标对比(与基线差异)===") keys = sorted(set(base.keys()) | set(new.keys())) for k in keys: if k == "plan_dist": print(f"[{k}]") base_dist = base.get(k, {}) new_dist = new.get(k, {}) all_plans = sorted(set(base_dist.keys()) | set(new_dist.keys())) for p in all_plans: b = base_dist.get(p, 0.0) n = new_dist.get(p, 0.0) diff = n - b print(f" - {p}: 基线={b:.4f}, 新值={n:.4f}, 差异={diff:+.4f}") else: b = base.get(k, np.nan) n = new.get(k, np.nan) if pd.notna(b) and pd.notna(n): diff = n - b print(f"[{k}] 基线={b:.6f}, 新值={n:.6f}, 差异={diff:+.6f}") else: print(f"[{k}] 基线={b}, 新值={n}, 差异=不可计算") # ========================= # 缺失填补策略 # ========================= def impute_mean_mode(df: pd.DataFrame) -> pd.DataFrame: """ 策略1:数值列用均值;分类/日期用众数。 """ out = df.copy() # 数值列:arpu -> 均值 if "arpu" in out.columns: mean_val = out["arpu"].mean(skipna=True) out["arpu"] = out["arpu"].fillna(mean_val) # 二值列:churn_flag -> 众数 if "churn_flag" in out.columns: mode_val = series_mode_safe(out["churn_flag"]) out["churn_flag"] = out["churn_flag"].fillna(mode_val) # 分类列:plan -> 众数 if "plan" in out.columns: mode_val = series_mode_safe(out["plan"]) out["plan"] = out["plan"].fillna(mode_val) # 日期列:用众数(最常见日期) for col in ["signup_day", "last_login"]: if col in out.columns: mode_val = series_mode_safe(out[col]) out[col] = out[col].fillna(mode_val) return out def impute_mode_only(df: pd.DataFrame) -> pd.DataFrame: """ 策略2:所有列统一用众数(数值列也用众数;众数缺失则回退为中位数/保持缺失)。 """ out = df.copy() for col in out.columns: mode_val = series_mode_safe(out[col]) out[col] = out[col].fillna(mode_val) return out def impute_linear_interpolation(df: pd.DataFrame) -> pd.DataFrame: """ 策略3:连续型列用线性插值,离散型列用前后填充+众数兜底。 - arpu:线性插值 - signup_day/last_login:转换为时间戳后线性插值(演示用) - plan/churn_flag:先前向填充,再后向填充,最后用众数兜底 """ out = df.copy() # 连续型:ARPU 线性插值(按原索引顺序) if "arpu" in out.columns: out["arpu"] = out["arpu"].interpolate(method="linear", limit_direction="both") # 日期列:全局时间序列线性插值(演示用,谨慎使用) for col in ["signup_day", "last_login"]: if col in out.columns: out[col] = interpolate_datetime_globally(out[col]) # 离散型:先 ffill/bfill 后众数兜底 for col in ["plan", "churn_flag"]: if col in out.columns: out[col] = out[col].ffill().bfill() mode_val = series_mode_safe(out[col]) out[col] = out[col].fillna(mode_val) return out def count_filled_cells(original: pd.DataFrame, imputed: pd.DataFrame) -> pd.Series: """ 统计各列本次填补的单元格数量(原始缺失 - 填补后缺失)。 """ return original.isna().sum() - imputed.isna().sum() # ========================= # 主流程 # ========================= def main(): # 读取数据 df = pd.read_csv(INPUT_PATH) # 类型规范化与基本清洗 df = coerce_dtypes(df) # 检查列是否齐全 required_cols = ["user_id", "signup_day", "last_login", "plan", "arpu", "churn_flag"] missing_cols = [c for c in required_cols if c not in df.columns] if missing_cols: raise ValueError(f"缺失必要列:{missing_cols}") # 缺失概览 print("\n=== 缺失值概览(清洗后原始数据)===") overview = missing_overview(df) print(overview) # 基线指标(不做填补,直接按现有非缺失数据计算) base_metrics = compute_metrics(df) print("\n=== 基线指标(按非缺失计算)===") for k, v in base_metrics.items(): if k == "plan_dist": print(f"[{k}] {v}") else: print(f"[{k}] {v}") # 策略1:均值/众数 df_mean_mode = impute_mean_mode(df) mm_metrics = compute_metrics(df_mean_mode) mm_filled = count_filled_cells(df, df_mean_mode) print("\n=== 策略1:均值/众数 填补量(各列)===") print(mm_filled) print_metrics_diff("策略1:均值/众数", base_metrics, mm_metrics) # 策略2:纯众数 df_mode = impute_mode_only(df) mode_metrics = compute_metrics(df_mode) mode_filled = count_filled_cells(df, df_mode) print("\n=== 策略2:纯众数 填补量(各列)===") print(mode_filled) print_metrics_diff("策略2:纯众数", base_metrics, mode_metrics) # 策略3:线性插值 df_interp = impute_linear_interpolation(df) interp_metrics = compute_metrics(df_interp) interp_filled = count_filled_cells(df, df_interp) print("\n=== 策略3:线性插值 填补量(各列)===") print(interp_filled) print_metrics_diff("策略3:线性插值", base_metrics, interp_metrics) # 可选:保存结果 df_mean_mode.to_csv(f"{OUTPUT_PREFIX}_mean_mode.csv", index=False) df_mode.to_csv(f"{OUTPUT_PREFIX}_mode.csv", index=False) df_interp.to_csv(f"{OUTPUT_PREFIX}_interpolate.csv", index=False) print("\n已输出三种策略的填补结果到当前目录。") if __name__ == "__main__": main() ``` 使用与解释要点: - 先进行类型规范化,确保数值、日期、分类、二值列符合预期类型;剔除缺失 user_id 的记录避免无法关联分析。 - 缺失侦测使用 missing_overview 输出每列缺失数与占比。 - 三种方案说明: - 均值/众数:对连续变量(ARPU)使用均值,对离散或日期使用众数,适合缺失比例很低且分布稳定的场景。 - 纯众数:所有列统一众数填补,简单但可能引入分布偏移,数值列众数在多峰或全唯一时用中位数回退。 - 线性插值:连续变量按序线性插值;日期通过时间戳全局排序插值,仅供演示,业务落地需基于同主体或时间序列维度;离散变量用前后填充并以众数兜底。 - 指标对比从分布和业务关键量(ARPU均值/方差、计划分布、流失率、登录间隔中位数)评估填补影响,便于选择合适策略。
为电商、金融或运营数据一键生成缺失值清洗脚本,快速试用多种填补方案,沉淀可复用模板,提升建模与报表的可信度。
在入库前完成缺失率审计与自动填补,输出对照表和异常清单,保障看板口径一致,减少手工修补时间。
通过带注释的脚本理解常见填补方法,安全试运行并查看影响,快速从入门到上手项目实战。
无需深度编码,描述数据特征即可获取可运行脚本,验证业务假设,缩短分析与迭代周期。
批量清理活动与用户数据缺失,保持样本量稳定,快速产出清洗前后指标对比,支撑A/B与投放决策。
针对不同缺失机制生成合适方案,保留实验严谨性,完整记录每一步设置与理由,便于论文复现。
将生成脚本嵌入数据加工链路,减少手写与维护成本,降低误清洗风险,提升整体数据质量。
以“缺失值清洗”为切入点,为数据分析、算法与BI团队提供一条从需求到可运行脚本的快速通道:让AI扮演数据挖掘专家,基于你提供的数据集特征与业务约束,稳定产出结构清晰、可直接嵌入流水线的Python清洗脚本;统一处理口径,降低人为差错,缩短数据准备周期,提升模型训练与报表输出的准确性,并支持多语言回应以便跨团队协作和交付。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期