热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
生成用于清理指定数据集缺失值的Python脚本。
以下提供一个可复用的Python脚本,用于电商销售数据的缺失值清理与多方案填补。脚本支持一键生成多种填补方案:均值、中位数、众数、分组、回归。针对本数据集的特点,脚本处理了qty/price的“0即缺失”混用、age缺失较多和payment_method少量空白的问题,且提供分组与回归填补的工程化模板,方便在其他数据集复用。
脚本说明
代码 请将以下内容保存为impute_sales.py,并安装依赖:pip install pandas numpy scikit-learn
# impute_sales.py
# -*- coding: utf-8 -*-
"""
电商销售表缺失值清理与多方案填补脚本
支持方案:均值/中位数/众数/分组/回归
"""
import argparse
import os
from typing import Dict, List
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
DEFAULT_NA_VALUES = ["", " ", " ", "NA", "N/A", "NULL", "null", "None", "none"]
def parse_args():
parser = argparse.ArgumentParser(description="电商销售表缺失值清理与多方案填补")
parser.add_argument("--input", type=str, required=True, help="输入CSV文件路径")
parser.add_argument("--output-dir", type=str, required=True, help="输出目录")
parser.add_argument(
"--strategies",
type=str,
default="all",
help="填补策略,逗号分隔:mean,median,mode,group,regression 或 all",
)
parser.add_argument(
"--zero-as-missing",
type=str,
default="qty,price",
help="将0视为缺失的列,逗号分隔;为空表示不启用",
)
parser.add_argument(
"--random-state",
type=int,
default=42,
help="随机种子(回归模型)",
)
return parser.parse_args()
def load_data(path: str, na_values: List[str]) -> pd.DataFrame:
df = pd.read_csv(path, na_values=na_values, dtype=str)
# 去除首尾空白以统一缺失识别
for col in df.columns:
df[col] = df[col].astype(str).str.strip()
df.loc[df[col].isin(["", " "]), col] = np.nan
return df
def convert_types(df: pd.DataFrame) -> pd.DataFrame:
# 类型转换,尽量严格但避免抛异常
out = df.copy()
# 日期
out["order_date"] = pd.to_datetime(out["order_date"], errors="coerce")
# 数值列
for col in ["qty", "price", "discount", "age"]:
out[col] = pd.to_numeric(out[col], errors="coerce")
# 标识列保留字符串
# 类别列保持字符串(可能含NaN)
return out
def standardize_missing(df: pd.DataFrame, zero_as_missing_cols: List[str]) -> pd.DataFrame:
out = df.copy()
# 0视为缺失(可选)
for col in zero_as_missing_cols:
if col in out.columns and pd.api.types.is_numeric_dtype(out[col]):
out.loc[out[col] == 0, col] = np.nan
return out
def add_time_features(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
out["order_year"] = out["order_date"].dt.year
out["order_month"] = out["order_date"].dt.month
out["order_weekday"] = out["order_date"].dt.weekday
return out
def fill_categorical_mode(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
out = df.copy()
for col in cols:
if col in out.columns:
mode_vals = out[col].mode(dropna=True)
if len(mode_vals) > 0:
out[col] = out[col].fillna(mode_vals.iloc[0])
return out
def impute_mean(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
# 类别列:众数
out = fill_categorical_mode(out, ["sku", "city", "payment_method"])
# 数值列:均值
for col in ["qty", "price", "discount", "age"]:
if col in out.columns:
mean_val = out[col].mean(skipna=True)
out[col] = out[col].fillna(mean_val)
return out
def impute_median(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
out = fill_categorical_mode(out, ["sku", "city", "payment_method"])
for col in ["qty", "price", "discount", "age"]:
if col in out.columns:
med_val = out[col].median(skipna=True)
out[col] = out[col].fillna(med_val)
return out
def impute_mode(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
# 所有列用众数(数值列众数可能不唯一,选第一个)
for col in out.columns:
mode_vals = out[col].mode(dropna=True)
if len(mode_vals) > 0:
out[col] = out[col].fillna(mode_vals.iloc[0])
return out
def impute_group(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
# payment_method按city众数
if "payment_method" in out.columns and "city" in out.columns:
out["payment_method"] = (
out.groupby("city")["payment_method"]
.transform(lambda s: s.fillna(s.mode().iloc[0] if len(s.mode()) > 0 else np.nan))
)
# 全局回落
pm_mode = out["payment_method"].mode(dropna=True)
if len(pm_mode) > 0:
out["payment_method"] = out["payment_method"].fillna(pm_mode.iloc[0])
# qty/price按sku中位数;全局回落用中位数
for col in ["qty", "price"]:
if col in out.columns and "sku" in out.columns:
out[col] = out.groupby("sku")[col].transform(lambda s: s.fillna(s.median()))
global_med = out[col].median(skipna=True)
out[col] = out[col].fillna(global_med)
# age按city中位数;全局回落用中位数
if "age" in out.columns and "city" in out.columns:
out["age"] = out.groupby("city")["age"].transform(lambda s: s.fillna(s.median()))
age_med = out["age"].median(skipna=True)
out["age"] = out["age"].fillna(age_med)
# discount若有缺失,统一用中位数(未指定分组)
if "discount" in out.columns:
out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True))
return out
def build_regression_imputer(
df: pd.DataFrame,
target: str,
random_state: int = 42,
) -> Pipeline:
"""
为目标列构建回归填补管道。
数值特征:median填补;类别特征:众数填补+OneHot。
模型:随机森林回归(对非线性和类别特征较稳健)。
"""
# 特征列定义:排除标识列和目标列
# 使用订单时间特征和其他可解释变量
time_features = ["order_year", "order_month", "order_weekday"]
cat_features = [c for c in ["sku", "city", "payment_method"] if c in df.columns]
num_features_all = [c for c in ["qty", "price", "discount", "age"] if c in df.columns]
num_features = [c for c in num_features_all if c != target]
# 组装列转换
transformers = []
if len(num_features) > 0:
transformers.append(("num", SimpleImputer(strategy="median"), num_features))
if len(cat_features) > 0:
transformers.append(
(
"cat",
Pipeline(
steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(handle_unknown="ignore")),
]
),
cat_features,
)
)
if len(time_features) > 0:
transformers.append(("time", SimpleImputer(strategy="median"), time_features))
preprocessor = ColumnTransformer(transformers=transformers, remainder="drop")
model = RandomForestRegressor(
n_estimators=200,
max_depth=None,
random_state=random_state,
n_jobs=-1,
)
pipe = Pipeline(steps=[("preprocess", preprocessor), ("model", model)])
return pipe
def impute_regression(df: pd.DataFrame, random_state: int = 42) -> pd.DataFrame:
out = df.copy()
# 类别填补优先处理:payment_method用众数,避免建模特征中缺失
out = fill_categorical_mode(out, ["payment_method", "sku", "city"])
# 时间特征
out = add_time_features(out)
# 针对discount的简单填补,避免作为特征时大量缺失
if "discount" in out.columns:
out["discount"] = out["discount"].fillna(out["discount"].median(skipna=True))
targets = [c for c in ["age", "price", "qty"] if c in out.columns]
for target in targets:
# 构建并训练回归模型
pipe = build_regression_imputer(out, target=target, random_state=random_state)
mask_train = out[target].notna()
mask_pred = out[target].isna()
if mask_train.sum() == 0 or mask_pred.sum() == 0:
# 无需训练或无待填补
continue
X_train = out.loc[mask_train]
y_train = out.loc[mask_train, target]
X_pred = out.loc[mask_pred]
pipe.fit(X_train, y_train)
y_hat = pipe.predict(X_pred)
# 合理边界控制:非负;age限定在0-100之间(如需调整可修改)
if target in ["qty", "price"]:
y_hat = np.maximum(y_hat, 0.0)
if target == "age":
y_hat = np.clip(y_hat, 0.0, 100.0)
out.loc[mask_pred, target] = y_hat
return out
def generate_outputs(df: pd.DataFrame, strategies: List[str], out_dir: str, random_state: int):
os.makedirs(out_dir, exist_ok=True)
results: Dict[str, pd.DataFrame] = {}
if "mean" in strategies:
results["mean"] = impute_mean(df)
if "median" in strategies:
results["median"] = impute_median(df)
if "mode" in strategies:
results["mode"] = impute_mode(df)
if "group" in strategies:
results["group"] = impute_group(df)
if "regression" in strategies:
results["regression"] = impute_regression(df, random_state=random_state)
# 输出保存
for name, dfi in results.items():
out_path = os.path.join(out_dir, f"{name}_imputed.csv")
dfi.to_csv(out_path, index=False)
# 缺失情况报告
miss_report = dfi.isna().sum()
print(f"[{name}] 保存至: {out_path}")
print(f"[{name}] 每列剩余缺失数:")
print(miss_report.to_string())
print("-" * 60)
return results
def main():
args = parse_args()
strategies = (
["mean", "median", "mode", "group", "regression"]
if args.strategies.strip().lower() == "all"
else [s.strip().lower() for s in args.strategies.split(",")]
)
zero_cols = [c.strip() for c in args.zero_as_missing.split(",") if c.strip()]
# 读取与标准化
df = load_data(args.input, na_values=DEFAULT_NA_VALUES)
df = convert_types(df)
df = standardize_missing(df, zero_as_missing_cols=zero_cols)
# 一键生成多方案填补
generate_outputs(df, strategies=strategies, out_dir=args.output_dir, random_state=args.random_state)
if __name__ == "__main__":
main()
使用示例
实现要点与专业建议
该脚本作为模板可在其他表结构上复用。对于不同数据集,仅需:
Below is a self-contained Python script that performs a pre-ingestion missingness audit, imputes missing values according to the specified rules, generates a before/after comparison for changed fields, and outputs an anomalies list. It is designed for the daily transaction wide table with columns [txn_id, ts, branch, channel, amount, fee]. Imputation rules:
This script also enforces consistent definitions and tracking:
You can run: python clean_transactions.py --input /path/to/input.csv --outdir ./out --impute-stat median
Code: #!/usr/bin/env python3
import argparse import os from typing import Tuple, Dict import pandas as pd import numpy as np
def compute_missingness(df: pd.DataFrame, cols: list) -> pd.DataFrame: total = len(df) audit = [] for c in cols: nulls = df[c].isna().sum() audit.append({"column": c, "null_count": int(nulls), "total": int(total), "null_rate": float(nulls / total if total else np.nan)}) return pd.DataFrame(audit)
def ensure_dtypes(df: pd.DataFrame) -> Tuple[pd.DataFrame, Dict[str, pd.Series]]: original = {} for col in ["txn_id", "ts", "branch", "channel", "amount", "fee"]: if col not in df.columns: raise ValueError(f"Missing required column: {col}")
# Preserve originals for anomaly detection
for c in ["ts", "amount", "fee"]:
original[f"orig_{c}"] = df[c].copy()
# Coerce types
df["txn_id"] = df["txn_id"].astype(str)
df["branch"] = df["branch"].astype(str)
df["channel"] = df["channel"].astype(str)
df["ts"] = pd.to_datetime(df["ts"], errors="coerce", utc=False)
df["amount"] = pd.to_numeric(df["amount"], errors="coerce")
df["fee"] = pd.to_numeric(df["fee"], errors="coerce")
return df, original
def build_group_stats(df: pd.DataFrame, stat: str = "median") -> Dict[str, pd.DataFrame]: agg_func = np.median if stat == "median" else np.mean use_cols = ["amount", "fee"]
g_bc = df.groupby(["branch", "channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={
"amount": "g_bc_amount", "fee": "g_bc_fee"
})
g_b = df.groupby(["branch"], dropna=False)[use_cols].agg(agg_func).rename(columns={
"amount": "g_b_amount", "fee": "g_b_fee"
})
g_c = df.groupby(["channel"], dropna=False)[use_cols].agg(agg_func).rename(columns={
"amount": "g_c_amount", "fee": "g_c_fee"
})
g_all = df[use_cols].agg(agg_func).to_dict() # {"amount": x, "fee": y}
return {"g_bc": g_bc, "g_b": g_b, "g_c": g_c, "g_all": g_all}
def impute_amount_fee(df: pd.DataFrame, stat: str = "median") -> pd.DataFrame: stats = build_group_stats(df, stat=stat)
df = df.merge(stats["g_bc"], how="left", left_on=["branch", "channel"], right_index=True)
df = df.merge(stats["g_b"], how="left", on="branch")
df = df.merge(stats["g_c"], how="left", on="channel")
# Prepare targets
df["amount_impute_source"] = None
df["fee_impute_source"] = None
# Amount
amount_filled = df["amount"].copy()
src = pd.Series(index=df.index, dtype=object)
mask = amount_filled.isna() & df["g_bc_amount"].notna()
amount_filled[mask] = df.loc[mask, "g_bc_amount"]
src[mask] = "branch+channel"
mask = amount_filled.isna() & df["g_b_amount"].notna()
amount_filled[mask] = df.loc[mask, "g_b_amount"]
src[mask] = "branch"
mask = amount_filled.isna() & df["g_c_amount"].notna()
amount_filled[mask] = df.loc[mask, "g_c_amount"]
src[mask] = "channel"
if np.isfinite(stats["g_all"]["amount"]) and not pd.isna(stats["g_all"]["amount"]):
mask = amount_filled.isna()
amount_filled[mask] = stats["g_all"]["amount"]
src[mask] = "global"
df["amount_impute_source"] = src.where(df["amount"].isna(), None)
df["amount_filled"] = amount_filled
df["amount_was_imputed"] = df["amount"].isna() & df["amount_filled"].notna()
# Fee
fee_filled = df["fee"].copy()
src = pd.Series(index=df.index, dtype=object)
mask = fee_filled.isna() & df["g_bc_fee"].notna()
fee_filled[mask] = df.loc[mask, "g_bc_fee"]
src[mask] = "branch+channel"
mask = fee_filled.isna() & df["g_b_fee"].notna()
fee_filled[mask] = df.loc[mask, "g_b_fee"]
src[mask] = "branch"
mask = fee_filled.isna() & df["g_c_fee"].notna()
fee_filled[mask] = df.loc[mask, "g_c_fee"]
src[mask] = "channel"
if np.isfinite(stats["g_all"]["fee"]) and not pd.isna(stats["g_all"]["fee"]):
mask = fee_filled.isna()
fee_filled[mask] = stats["g_all"]["fee"]
src[mask] = "global"
df["fee_impute_source"] = src.where(df["fee"].isna(), None)
df["fee_filled"] = fee_filled
df["fee_was_imputed"] = df["fee"].isna() & df["fee_filled"].notna()
return df
def impute_ts_ffill(df: pd.DataFrame) -> pd.DataFrame: # Forward fill within (branch, channel), sorted by ts ascending, then txn_id for deterministic order # Note: initial missing ts in a group remain missing (no backfill). order_cols = ["branch", "channel", "ts", "txn_id"] df_sorted = df.sort_values(order_cols, na_position="first").copy() ts_ffilled = df_sorted.groupby(["branch", "channel"], dropna=False)["ts"].ffill() df_sorted["ts_filled"] = ts_ffilled
# Restore original index alignment
df["ts_filled"] = df_sorted.loc[df_sorted.index, "ts_filled"].sort_index()
df["ts_was_imputed"] = df["ts"].isna() & df["ts_filled"].notna()
return df
def build_before_after(df: pd.DataFrame) -> pd.DataFrame: changes = [] for col, filled_col, src_col in [ ("amount", "amount_filled", "amount_impute_source"), ("fee", "fee_filled", "fee_impute_source"), ("ts", "ts_filled", None), ]: diff_mask = ~df[col].eq(df[filled_col]) # handles NaN vs value etc. if diff_mask.any(): tmp = pd.DataFrame({ "txn_id": df.loc[diff_mask, "txn_id"].astype(str), "field": col, "before": df.loc[diff_mask, col].astype(str), "after": df.loc[diff_mask, filled_col].astype(str), }) if src_col: tmp["impute_source"] = df.loc[diff_mask, src_col].astype(str) else: tmp["impute_source"] = "ffill" changes.append(tmp) if changes: return pd.concat(changes, ignore_index=True) else: return pd.DataFrame(columns=["txn_id", "field", "before", "after", "impute_source"])
def build_anomalies(df: pd.DataFrame, original: Dict[str, pd.Series]) -> pd.DataFrame: anomalies = []
# Duplicated txn_id
dups = df[df["txn_id"].duplicated(keep=False)].copy()
if not dups.empty:
dups["anomaly_type"] = "duplicate_txn_id"
anomalies.append(dups[["txn_id", "anomaly_type"]])
# Rows still missing after imputation in critical fields
still_missing = df[df["ts_filled"].isna() | df["amount_filled"].isna() | df["fee_filled"].isna()].copy()
if not still_missing.empty:
still_missing["anomaly_type"] = "residual_missing_after_imputation"
anomalies.append(still_missing[["txn_id", "anomaly_type"]])
# Coercion-induced NaNs for ts/amount/fee
for col, orig_col in [("ts", "orig_ts"), ("amount", "orig_amount"), ("fee", "orig_fee")]:
coerced_nan_mask = original[orig_col].notna() & df[col].isna()
if coerced_nan_mask.any():
rows = df.loc[coerced_nan_mask, ["txn_id"]].copy()
rows["anomaly_type"] = f"type_coercion_nan_{col}"
anomalies.append(rows)
if anomalies:
out = pd.concat(anomalies, ignore_index=True).drop_duplicates()
return out
else:
return pd.DataFrame(columns=["txn_id", "anomaly_type"])
def imputation_summary(df: pd.DataFrame) -> pd.DataFrame: rows = [] for field, src_col, flag_col in [ ("amount", "amount_impute_source", "amount_was_imputed"), ("fee", "fee_impute_source", "fee_was_imputed"), ]: # Distribution of sources src_counts = df.loc[df[flag_col], src_col].value_counts(dropna=False) for src, cnt in src_counts.items(): rows.append({"field": field, "impute_source": src if src is not None else "none", "count": int(cnt)}) # Total imputed rows.append({"field": field, "impute_source": "TOTAL_IMPUTED", "count": int(df[flag_col].sum())}) # ts ffill count rows.append({"field": "ts", "impute_source": "ffill", "count": int(df["ts_was_imputed"].sum())}) return pd.DataFrame(rows)
def main(): parser = argparse.ArgumentParser(description="Clean daily transaction wide table: audit missingness, impute, and output artifacts.") parser.add_argument("--input", required=True, help="Input CSV path with columns [txn_id, ts, branch, channel, amount, fee]") parser.add_argument("--outdir", required=True, help="Output directory") parser.add_argument("--impute-stat", choices=["median", "mean"], default="median", help="Statistic used for amount/fee imputation") parser.add_argument("--encoding", default="utf-8", help="CSV encoding") parser.add_argument("--sep", default=",", help="CSV separator (default ',')") args = parser.parse_args()
os.makedirs(args.outdir, exist_ok=True)
# Load
df_raw = pd.read_csv(args.input, encoding=args.encoding, sep=args.sep, dtype=str)
# Preserve a working numeric/date copy
df = df_raw.copy()
# Pre-ingestion missingness audit (before any coercion/imputation)
pre_ingestion_audit = compute_missingness(df, ["txn_id", "ts", "branch", "channel", "amount", "fee"])
pre_ingestion_audit.to_csv(os.path.join(args.outdir, "audit_missingness_before.csv"), index=False)
# Type coercion and validation
df, original = ensure_dtypes(df)
# Impute amount/fee
df = impute_amount_fee(df, stat=args.impute_stat)
# Forward fill ts within (branch, channel)
df = impute_ts_ffill(df)
# Build comparison table (before vs after) for changed values
before_after = build_before_after(df)
before_after.to_csv(os.path.join(args.outdir, "before_after_changes.csv"), index=False)
# Anomalies
anomalies = build_anomalies(df, original)
anomalies.to_csv(os.path.join(args.outdir, "anomalies.csv"), index=False)
# Imputation summary
summary = imputation_summary(df)
summary.to_csv(os.path.join(args.outdir, "imputation_summary.csv"), index=False)
# Cleaned dataset (with filled values and explicit lineage fields)
cleaned = df.copy()
cleaned["amount"] = cleaned["amount_filled"]
cleaned["fee"] = cleaned["fee_filled"]
cleaned["ts"] = cleaned["ts_filled"]
# Keep key lineage columns; drop helper columns not needed downstream
drop_cols = [
"g_bc_amount", "g_bc_fee", "g_b_amount", "g_b_fee", "g_c_amount", "g_c_fee",
"amount_filled", "fee_filled", "ts_filled",
]
cleaned = cleaned.drop(columns=[c for c in drop_cols if c in cleaned.columns])
cleaned.to_csv(os.path.join(args.outdir, "transactions_cleaned.csv"), index=False)
# Post-processing missingness audit for verification of consistency
post_audit = compute_missingness(cleaned, ["txn_id", "ts", "branch", "channel", "amount", "fee"])
post_audit.to_csv(os.path.join(args.outdir, "audit_missingness_after.csv"), index=False)
# Basic stdout log
print("Completed.")
print(f"Rows: {len(df)}")
print(f"Pre-ingestion missingness audit: {os.path.join(args.outdir, 'audit_missingness_before.csv')}")
print(f"Before/After changes: {os.path.join(args.outdir, 'before_after_changes.csv')}")
print(f"Anomalies: {os.path.join(args.outdir, 'anomalies.csv')}")
print(f"Imputation summary: {os.path.join(args.outdir, 'imputation_summary.csv')}")
print(f"Cleaned dataset: {os.path.join(args.outdir, 'transactions_cleaned.csv')}")
print(f"Post-imputation missingness audit: {os.path.join(args.outdir, 'audit_missingness_after.csv')}")
if name == "main": pd.set_option("future.no_silent_downcasting", True) main()
Notes and rationale:
下面提供一个带注释的 Python 脚本,完成以下步骤:
说明:
请按需修改 INPUT_PATH 的文件路径。
代码如下:
import pandas as pd
import numpy as np
# =========================
# 配置
# =========================
INPUT_PATH = "path/to/jiaoxue_yunying.csv" # 修改为实际路径
OUTPUT_PREFIX = "imputed_teaching_ops" # 输出文件名前缀,可按需修改
pd.set_option("display.width", 120)
pd.set_option("display.max_columns", 20)
# =========================
# 工具函数
# =========================
def coerce_dtypes(df: pd.DataFrame) -> pd.DataFrame:
"""
规范各列数据类型:
- user_id 作为标识符:转为字符串;若缺失则无法定位用户,建议剔除
- signup_day/last_login:转为 datetime64[ns]
- plan:保留为字符串/分类
- arpu:转为浮点数
- churn_flag:转为 pandas 可空整型 Int64,仅接受 {0,1}
"""
df = df.copy()
# 标识符:转为字符串
if "user_id" in df.columns:
df["user_id"] = df["user_id"].astype(str).str.strip()
# 空字符串视为缺失
df.loc[df["user_id"].isin(["", "nan", "None"]), "user_id"] = np.nan
# 日期列
for col in ["signup_day", "last_login"]:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
# 分类列
if "plan" in df.columns:
df["plan"] = df["plan"].astype(str).str.strip()
df.loc[df["plan"].isin(["", "nan", "None"]), "plan"] = np.nan
# 数值列
if "arpu" in df.columns:
df["arpu"] = pd.to_numeric(df["arpu"], errors="coerce")
# 二值标记:仅接受 {0,1}
if "churn_flag" in df.columns:
s = df["churn_flag"]
s = s.replace({
True: 1, False: 0,
"True": 1, "False": 0,
"true": 1, "false": 0,
"1": 1, "0": 0
})
s = pd.to_numeric(s, errors="coerce")
s = s.where(s.isin([0, 1]), np.nan)
df["churn_flag"] = s.astype("Int64")
# 缺失 user_id 的行剔除(无法关联用户)
if "user_id" in df.columns:
before = len(df)
df = df[~df["user_id"].isna()].copy()
after = len(df)
if before != after:
print(f"已剔除缺失 user_id 的行数:{before - after}")
return df
def missing_overview(df: pd.DataFrame) -> pd.DataFrame:
"""
返回缺失值概览(数量与占比)。
"""
na_count = df.isna().sum()
na_pct = df.isna().mean().round(4)
overview = pd.DataFrame({"missing_count": na_count, "missing_pct": na_pct})
return overview
def series_mode_safe(s: pd.Series):
"""
安全众数:若存在多个众数,取第一个;若无众数(如全唯一),对数值列回退为中位数,对非数值列保持缺失。
"""
modes = s.mode(dropna=True)
if len(modes) > 0:
return modes.iloc[0]
# 众数不存在时的回退策略
if pd.api.types.is_numeric_dtype(s):
return s.median(skipna=True)
else:
return np.nan # 非数值列不强行回退
def interpolate_datetime_globally(s: pd.Series) -> pd.Series:
"""
对 datetime 列进行线性插值(演示用):
- 将时间转为 int64(纳秒),将 NaT 映射为 NaN
- 按时间值排序后线性插值,再映射回原索引
注意:跨用户的全局插值会引入时间结构假设,仅用于缺失很少的演示;业务上建议基于同一主体或时间序列。
"""
s = s.copy()
# 转时间戳(纳秒),NaT 映射为 NaN
ts = s.astype("int64")
ts = ts.where(~s.isna(), np.nan)
# 按值排序进行插值
order_idx = s.sort_values().index # NaT 自动排在最后
ts_sorted = ts.loc[order_idx]
ts_interp_sorted = ts_sorted.interpolate(method="linear", limit_direction="both")
# 映射回原索引顺序
ts_interp = ts_interp_sorted.reindex(s.index)
# 转回 datetime
s_interp = pd.to_datetime(ts_interp, unit="ns")
return s_interp
def compute_metrics(df: pd.DataFrame) -> dict:
"""
计算用于对比的关键指标:
- arpu_mean, arpu_std
- churn_rate(churn_flag 的均值)
- plan_dist(计划的归一化分布,取前若干项)
- median_login_delay_days(last_login - signup_day 的天数中位数)
"""
metrics = {}
# ARPU
if "arpu" in df.columns:
metrics["arpu_mean"] = df["arpu"].mean(skipna=True)
metrics["arpu_std"] = df["arpu"].std(skipna=True)
# 流失率
if "churn_flag" in df.columns:
# 可空整型的均值默认跳过缺失
metrics["churn_rate"] = df["churn_flag"].mean(skipna=True)
# 计划分布(归一化)
if "plan" in df.columns:
plan_dist = df["plan"].value_counts(normalize=True, dropna=True)
# 转为字典便于打印
metrics["plan_dist"] = plan_dist.to_dict()
# 登录间隔(天)中位数
if set(["signup_day", "last_login"]).issubset(df.columns):
mask = df["signup_day"].notna() & df["last_login"].notna()
if mask.any():
deltas = (df.loc[mask, "last_login"] - df.loc[mask, "signup_day"]).dt.days
metrics["median_login_delay_days"] = float(deltas.median())
else:
metrics["median_login_delay_days"] = np.nan
return metrics
def print_metrics_diff(title: str, base: dict, new: dict):
"""
打印两组指标的差异。
"""
print(f"\n=== {title} 指标对比(与基线差异)===")
keys = sorted(set(base.keys()) | set(new.keys()))
for k in keys:
if k == "plan_dist":
print(f"[{k}]")
base_dist = base.get(k, {})
new_dist = new.get(k, {})
all_plans = sorted(set(base_dist.keys()) | set(new_dist.keys()))
for p in all_plans:
b = base_dist.get(p, 0.0)
n = new_dist.get(p, 0.0)
diff = n - b
print(f" - {p}: 基线={b:.4f}, 新值={n:.4f}, 差异={diff:+.4f}")
else:
b = base.get(k, np.nan)
n = new.get(k, np.nan)
if pd.notna(b) and pd.notna(n):
diff = n - b
print(f"[{k}] 基线={b:.6f}, 新值={n:.6f}, 差异={diff:+.6f}")
else:
print(f"[{k}] 基线={b}, 新值={n}, 差异=不可计算")
# =========================
# 缺失填补策略
# =========================
def impute_mean_mode(df: pd.DataFrame) -> pd.DataFrame:
"""
策略1:数值列用均值;分类/日期用众数。
"""
out = df.copy()
# 数值列:arpu -> 均值
if "arpu" in out.columns:
mean_val = out["arpu"].mean(skipna=True)
out["arpu"] = out["arpu"].fillna(mean_val)
# 二值列:churn_flag -> 众数
if "churn_flag" in out.columns:
mode_val = series_mode_safe(out["churn_flag"])
out["churn_flag"] = out["churn_flag"].fillna(mode_val)
# 分类列:plan -> 众数
if "plan" in out.columns:
mode_val = series_mode_safe(out["plan"])
out["plan"] = out["plan"].fillna(mode_val)
# 日期列:用众数(最常见日期)
for col in ["signup_day", "last_login"]:
if col in out.columns:
mode_val = series_mode_safe(out[col])
out[col] = out[col].fillna(mode_val)
return out
def impute_mode_only(df: pd.DataFrame) -> pd.DataFrame:
"""
策略2:所有列统一用众数(数值列也用众数;众数缺失则回退为中位数/保持缺失)。
"""
out = df.copy()
for col in out.columns:
mode_val = series_mode_safe(out[col])
out[col] = out[col].fillna(mode_val)
return out
def impute_linear_interpolation(df: pd.DataFrame) -> pd.DataFrame:
"""
策略3:连续型列用线性插值,离散型列用前后填充+众数兜底。
- arpu:线性插值
- signup_day/last_login:转换为时间戳后线性插值(演示用)
- plan/churn_flag:先前向填充,再后向填充,最后用众数兜底
"""
out = df.copy()
# 连续型:ARPU 线性插值(按原索引顺序)
if "arpu" in out.columns:
out["arpu"] = out["arpu"].interpolate(method="linear", limit_direction="both")
# 日期列:全局时间序列线性插值(演示用,谨慎使用)
for col in ["signup_day", "last_login"]:
if col in out.columns:
out[col] = interpolate_datetime_globally(out[col])
# 离散型:先 ffill/bfill 后众数兜底
for col in ["plan", "churn_flag"]:
if col in out.columns:
out[col] = out[col].ffill().bfill()
mode_val = series_mode_safe(out[col])
out[col] = out[col].fillna(mode_val)
return out
def count_filled_cells(original: pd.DataFrame, imputed: pd.DataFrame) -> pd.Series:
"""
统计各列本次填补的单元格数量(原始缺失 - 填补后缺失)。
"""
return original.isna().sum() - imputed.isna().sum()
# =========================
# 主流程
# =========================
def main():
# 读取数据
df = pd.read_csv(INPUT_PATH)
# 类型规范化与基本清洗
df = coerce_dtypes(df)
# 检查列是否齐全
required_cols = ["user_id", "signup_day", "last_login", "plan", "arpu", "churn_flag"]
missing_cols = [c for c in required_cols if c not in df.columns]
if missing_cols:
raise ValueError(f"缺失必要列:{missing_cols}")
# 缺失概览
print("\n=== 缺失值概览(清洗后原始数据)===")
overview = missing_overview(df)
print(overview)
# 基线指标(不做填补,直接按现有非缺失数据计算)
base_metrics = compute_metrics(df)
print("\n=== 基线指标(按非缺失计算)===")
for k, v in base_metrics.items():
if k == "plan_dist":
print(f"[{k}] {v}")
else:
print(f"[{k}] {v}")
# 策略1:均值/众数
df_mean_mode = impute_mean_mode(df)
mm_metrics = compute_metrics(df_mean_mode)
mm_filled = count_filled_cells(df, df_mean_mode)
print("\n=== 策略1:均值/众数 填补量(各列)===")
print(mm_filled)
print_metrics_diff("策略1:均值/众数", base_metrics, mm_metrics)
# 策略2:纯众数
df_mode = impute_mode_only(df)
mode_metrics = compute_metrics(df_mode)
mode_filled = count_filled_cells(df, df_mode)
print("\n=== 策略2:纯众数 填补量(各列)===")
print(mode_filled)
print_metrics_diff("策略2:纯众数", base_metrics, mode_metrics)
# 策略3:线性插值
df_interp = impute_linear_interpolation(df)
interp_metrics = compute_metrics(df_interp)
interp_filled = count_filled_cells(df, df_interp)
print("\n=== 策略3:线性插值 填补量(各列)===")
print(interp_filled)
print_metrics_diff("策略3:线性插值", base_metrics, interp_metrics)
# 可选:保存结果
df_mean_mode.to_csv(f"{OUTPUT_PREFIX}_mean_mode.csv", index=False)
df_mode.to_csv(f"{OUTPUT_PREFIX}_mode.csv", index=False)
df_interp.to_csv(f"{OUTPUT_PREFIX}_interpolate.csv", index=False)
print("\n已输出三种策略的填补结果到当前目录。")
if __name__ == "__main__":
main()
使用与解释要点:
以“缺失值清洗”为切入点,为数据分析、算法与BI团队提供一条从需求到可运行脚本的快速通道:让AI扮演数据挖掘专家,基于你提供的数据集特征与业务约束,稳定产出结构清晰、可直接嵌入流水线的Python清洗脚本;统一处理口径,降低人为差错,缩短数据准备周期,提升模型训练与报表输出的准确性,并支持多语言回应以便跨团队协作和交付。
为电商、金融或运营数据一键生成缺失值清洗脚本,快速试用多种填补方案,沉淀可复用模板,提升建模与报表的可信度。
在入库前完成缺失率审计与自动填补,输出对照表和异常清单,保障看板口径一致,减少手工修补时间。
通过带注释的脚本理解常见填补方法,安全试运行并查看影响,快速从入门到上手项目实战。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期