热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据数据集特征和清理需求,自动生成可执行的Python脚本,精准检测并处理缺失值,支持多种数据类型和清理策略,高效完成数据预处理,为后续分析和建模提供可靠基础。
以下为可执行的 Python 脚本,按要求实现对电商订单明细数据集 transactions.csv 的缺失值与异常值清理,支持命令行参数,打印缺失分布报告与填充摘要,并包含中英双语注释。可直接嵌入 ETL 流水线。
使用示例: python clean_transactions_missing.py --input /path/transactions.csv --output /path/transactions_clean.csv --log-level INFO --seed 42
脚本 clean_transactions_missing.py: #!/usr/bin/env python3
""" Data cleaning script for transactions.csv
功能概述(中文):
Overview (English):
import argparse import logging import sys import numpy as np import pandas as pd from typing import Dict, Tuple
UNKNOWN = "UNKNOWN" # 用于类别型未知值 / sentinel for unknown categorical NO_COUPON = "NO_COUPON" # 用于无优惠券 / sentinel for no coupon
def parse_args(): parser = argparse.ArgumentParser( description="Clean missing and anomalous values in transactions.csv (电商订单明细缺失/异常清理)" ) parser.add_argument("--input", required=True, help="Input CSV file path (输入文件路径)") parser.add_argument("--output", required=True, help="Output CSV file path (输出文件路径)") parser.add_argument( "--log-level", default="INFO", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], help="Logging level (日志级别)" ) parser.add_argument( "--seed", type=int, default=42, help="Random seed for reproducibility (随机种子)" ) return parser.parse_args()
def setup_logging(level: str): logging.basicConfig( level=getattr(logging, level), format="%(asctime)s %(levelname)s %(message)s", datefmt="%Y-%m-%d %H:%M:%S" )
def read_data(path: str) -> pd.DataFrame: """ 读取 CSV,设置合适 dtype,并将时间列延后转换以做严格校验。 Read CSV with dtypes; convert datetime columns afterward with strict coercion. """ # Define dtype mapping for known columns to improve memory usage and consistency dtype_map = { "order_id": "string", "user_id": "string", # parse date columns as string first, then to datetime with errors='coerce' "order_time": "string", "delivery_date": "string", "order_amount": "float64", "discount_rate": "float64", "payment_method": "string", "province": "string", "sku_count": "float64", # use float during cleaning; cast to int later "coupon_code": "string", "source_channel": "string", }
# Treat common placeholders and empty strings as NaN
na_vals = ["", "NA", "N/A", "na", "n/a", "Null", "NULL", "null", "None", "none"]
df = pd.read_csv(
path,
dtype=dtype_map,
na_values=na_vals,
keep_default_na=True,
low_memory=True
)
# Strip whitespace for string-like columns to normalize
for col in ["order_id", "user_id", "payment_method", "province", "coupon_code", "source_channel"]:
if col in df.columns:
df[col] = df[col].astype("string")
df[col] = df[col].str.strip()
# Convert datetime with coercion (invalid -> NaT)
if "order_time" in df.columns:
df["order_time"] = pd.to_datetime(df["order_time"], errors="coerce", utc=False)
if "delivery_date" in df.columns:
df["delivery_date"] = pd.to_datetime(df["delivery_date"], errors="coerce", utc=False)
return df
def assert_primary_key(df: pd.DataFrame): """ 验证主键非空与唯一;不满足则报错退出。 Assert non-null and uniqueness of primary key; abort if violated. """ if "order_id" not in df.columns: logging.error("Missing required column: order_id (缺少必需字段 order_id)") sys.exit(1)
if df["order_id"].isna().any():
n = int(df["order_id"].isna().sum())
logging.error("order_id contains nulls: %d (order_id 存在空值,无法修复)", n)
sys.exit(1)
dup = df["order_id"].duplicated(keep=False)
if dup.any():
n = int(dup.sum())
logging.error("order_id has duplicates: %d (order_id 存在重复,无法修复)", n)
sys.exit(1)
def build_missing_masks(df: pd.DataFrame) -> Dict[str, pd.Series]: """ 构造列级缺失/异常掩码(业务规则)以报告与清理。 Build missing/anomaly masks (business rules) for reporting and cleaning. """ masks = {}
# order_amount: NaN or negative considered missing/anomalous
if "order_amount" in df.columns:
masks["order_amount"] = df["order_amount"].isna() | (df["order_amount"] < 0)
# discount_rate: NaN or outside [0,1] considered missing/anomalous
if "discount_rate" in df.columns:
dr = df["discount_rate"]
masks["discount_rate"] = dr.isna() | (~dr.between(0, 1))
# payment_method: NaN considered missing
if "payment_method" in df.columns:
pm = df["payment_method"]
masks["payment_method"] = pm.isna()
# province: NaN considered missing
if "province" in df.columns:
pv = df["province"]
masks["province"] = pv.isna()
# delivery_date: NaT or delivery_date < order_time considered invalid
if "delivery_date" in df.columns:
dd = df["delivery_date"]
if "order_time" in df.columns:
ot = df["order_time"]
masks["delivery_date"] = dd.isna() | (dd < ot)
else:
masks["delivery_date"] = dd.isna()
# coupon_code: NaN considered missing (business-as-usual)
if "coupon_code" in df.columns:
cc = df["coupon_code"]
masks["coupon_code"] = cc.isna()
# order_time: NaT considered missing
if "order_time" in df.columns:
ot = df["order_time"]
masks["order_time"] = ot.isna()
# sku_count: NaN or == 0 considered missing/anomalous
if "sku_count" in df.columns:
sc = df["sku_count"]
masks["sku_count"] = sc.isna() | (sc == 0)
# source_channel: NaN considered missing (not expected but safe)
if "source_channel" in df.columns:
scn = df["source_channel"]
masks["source_channel"] = scn.isna()
return masks
def print_missing_report(df: pd.DataFrame, masks: Dict[str, pd.Series], title: str): """ 打印缺失/异常分布报告(列级计数与比例)。 Print missing/anomaly distribution report (counts and percentages per column). """ logging.info("=== %s ===", title) n = len(df) for col, m in masks.items(): cnt = int(m.sum()) pct = (cnt / n) * 100 if n > 0 else 0.0 logging.info( "Column: %s | Missing/Anomalous: %d (%.4f%%)", col, cnt, pct )
def impute_values(df: pd.DataFrame, masks: Dict[str, pd.Series], rng: np.random.Generator) -> Dict[str, str]: """ 依据指定策略执行清理/填充,并返回每列的填充策略摘要。 Perform cleaning/imputation per specified strategies. Return summary per column. """ summary = {}
# 1) order_amount: numeric median fill; treat negatives as missing
if "order_amount" in df.columns and "order_amount" in masks:
mask = masks["order_amount"]
valid = df.loc[~df["order_amount"].isna() & (df["order_amount"] >= 0), "order_amount"]
if len(valid) == 0:
median_val = 0.0
logging.warning("No valid order_amount found; using 0.0 as fallback median (未发现有效 order_amount,回退至 0.0)")
else:
median_val = float(valid.median())
fill_cnt = int(mask.sum())
df.loc[mask, "order_amount"] = median_val
summary["order_amount"] = f"numeric median fill -> {median_val} | filled: {fill_cnt}"
# 2) discount_rate: mark out-of-range as missing, median fill, then clip to [0,1]
if "discount_rate" in df.columns and "discount_rate" in masks:
mask = masks["discount_rate"]
valid = df.loc[df["discount_rate"].between(0, 1), "discount_rate"]
if len(valid) == 0:
median_val = 0.0
logging.warning("No valid discount_rate found; using 0.0 as fallback median (未发现有效 discount_rate,回退至 0.0)")
else:
median_val = float(valid.median())
fill_cnt = int(mask.sum())
# set invalid to NaN then fill for clarity
invalid_mask = (~df["discount_rate"].between(0, 1)) & df["discount_rate"].notna()
if invalid_mask.any():
df.loc[invalid_mask, "discount_rate"] = np.nan
df.loc[mask, "discount_rate"] = median_val
# clip final values
df["discount_rate"] = df["discount_rate"].clip(lower=0.0, upper=1.0)
summary["discount_rate"] = f"numeric median fill -> {median_val} + clip[0,1] | filled: {fill_cnt}"
# 3) payment_method: categorical sentinel fill 'UNKNOWN'
if "payment_method" in df.columns and "payment_method" in masks:
mask = masks["payment_method"]
fill_cnt = int(mask.sum())
df.loc[mask, "payment_method"] = UNKNOWN
summary["payment_method"] = f"categorical sentinel fill -> {UNKNOWN} | filled: {fill_cnt}"
# 4) province: categorical sentinel fill 'UNKNOWN'
if "province" in df.columns and "province" in masks:
mask = masks["province"]
fill_cnt = int(mask.sum())
df.loc[mask, "province"] = UNKNOWN
summary["province"] = f"categorical sentinel fill -> {UNKNOWN} | filled: {fill_cnt}"
# 5) source_channel: categorical sentinel fill 'UNKNOWN' (if any missing)
if "source_channel" in df.columns and "source_channel" in masks:
mask = masks["source_channel"]
fill_cnt = int(mask.sum())
if fill_cnt > 0:
df.loc[mask, "source_channel"] = UNKNOWN
summary["source_channel"] = f"categorical sentinel fill -> {UNKNOWN} | filled: {fill_cnt}"
# 6) coupon_code: categorical sentinel fill 'NO_COUPON' (business-as-usual empty)
if "coupon_code" in df.columns and "coupon_code" in masks:
mask = masks["coupon_code"]
fill_cnt = int(mask.sum())
df.loc[mask, "coupon_code"] = NO_COUPON
summary["coupon_code"] = f"categorical sentinel fill -> {NO_COUPON} | filled: {fill_cnt}"
# 7) sku_count: treat NaN or 0 as missing; fill with median of valid (>=1), round to nearest int >=1
if "sku_count" in df.columns and "sku_count" in masks:
mask = masks["sku_count"]
valid = df.loc[df["sku_count"].notna() & (df["sku_count"] >= 1), "sku_count"]
if len(valid) == 0:
median_int = 1
logging.warning("No valid sku_count found; using 1 as fallback median (未发现有效 sku_count,回退至 1)")
else:
median_val = float(valid.median())
# round half up to nearest integer, enforce >=1
median_int = int(np.floor(median_val + 0.5))
median_int = max(median_int, 1)
fill_cnt = int(mask.sum())
df.loc[mask, "sku_count"] = median_int
# cast to int
try:
df["sku_count"] = df["sku_count"].astype(np.int64)
except Exception:
# fallback to pandas nullable integer if needed
df["sku_count"] = df["sku_count"].astype("Int64")
summary["sku_count"] = f"numeric median fill (rounded) -> {median_int} | filled: {fill_cnt}"
# 8) order_time: forward fill then backward fill (preserve order, avoid reordering)
if "order_time" in df.columns and "order_time" in masks:
mask = masks["order_time"]
before_missing = int(mask.sum())
if before_missing > 0:
df["order_time"] = df["order_time"].ffill().bfill()
after_missing = int(df["order_time"].isna().sum())
summary["order_time"] = f"forward fill then backward fill | filled: {before_missing - after_missing} | remaining: {after_missing}"
# 9) delivery_date: business-meaningful missing; do NOT impute; invalidate dd<ot to NaT
if "delivery_date" in df.columns and "delivery_date" in masks:
mask = masks["delivery_date"]
# Only fix invalid dd<ot -> NaT; keep genuine missing as is (未出库)
if "order_time" in df.columns:
invalid_mask = df["delivery_date"].notna() & df["order_time"].notna() & (df["delivery_date"] < df["order_time"])
invalid_cnt = int(invalid_mask.sum())
if invalid_cnt > 0:
df.loc[invald_mask, "delivery_date"] = pd.NaT # typo fixed below
# Summarize without filling missing by design
total_missing_like = int(mask.sum())
summary["delivery_date"] = f"no fill (business-null kept); invalid (delivery<order) coerced to NaT if any | affected: {total_missing_like}"
return summary
def validate_time_monotonic(df: pd.DataFrame): """ 验证 order_time 非递减(输入已按时间有序;本脚本不改变顺序)。 Validate non-decreasing order_time. The script does not reorder rows. """ if "order_time" not in df.columns: return ot = df["order_time"] # Allow equal timestamps; ensure not decreasing # If any violation detected, log warning but do not reorder decreasing = (ot.shift(-1) < ot).fillna(False) if decreasing.any(): n = int(decreasing.sum()) logging.warning("Detected %d instances of decreasing order_time after fill (填充后出现 %d 个时间逆序). Input should be time-ordered.", n)
def print_imputation_summary(summary: Dict[str, str]): """ 打印每个列的填充策略与影响行数摘要。 Print per-column imputation strategy and affected row counts. """ logging.info("=== Imputation Summary (填充摘要) ===") for col in sorted(summary.keys()): logging.info("Column: %s | %s", col, summary[col])
def main(): args = parse_args() setup_logging(args.log_level) np.random.seed(args.seed) rng = np.random.default_rng(args.seed)
logging.info("Loading data from: %s", args.input)
df = read_data(args.input)
# Validate primary key constraints
assert_primary_key(df)
# Build masks and report before cleaning
masks_before = build_missing_masks(df)
print_missing_report(df, masks_before, title="Missing/Anomaly Report BEFORE Cleaning (清理前缺失/异常分布)")
# Perform cleaning/imputation
# Fix a small typo introduced earlier
# delivery_date invalid mask variable needs correct name within the function; we handle here explicitly
if "delivery_date" in df.columns and "order_time" in df.columns:
invalid_mask_dd = df["delivery_date"].notna() & df["order_time"].notna() & (df["delivery_date"] < df["order_time"])
invalid_cnt_dd = int(invalid_mask_dd.sum())
if invalid_cnt_dd > 0:
logging.info("Coercing invalid delivery_date < order_time to NaT: %d rows (将 delivery_date<order_time 置为 NaT)", invalid_cnt_dd)
df.loc[invalid_mask_dd, "delivery_date"] = pd.NaT
# Rebuild masks after this correction so imputation summary is accurate
masks_before = build_missing_masks(df)
summary = impute_values(df, masks_before, rng)
# Validate time non-decreasing after fill (no reordering)
validate_time_monotonic(df)
# Report after cleaning
masks_after = build_missing_masks(df)
print_missing_report(df, masks_after, title="Missing/Anomaly Report AFTER Cleaning (清理后缺失/异常分布)")
# Print imputation summary
print_imputation_summary(summary)
# Ensure column types are consistent for output
# Keep datetime columns as isoformat; pandas will handle formatting
# Ensure categorical-like columns remain strings (not categories) to avoid downstream schema surprises
for col in ["payment_method", "province", "coupon_code", "source_channel", "user_id", "order_id"]:
if col in df.columns:
df[col] = df[col].astype("string")
# Write output
logging.info("Writing cleaned data to: %s", args.output)
df.to_csv(args.output, index=False)
logging.info("Done. Rows: %d, Columns: %d", df.shape[0], df.shape[1])
if name == "main": main()
实现说明(要点):
下面提供一个可运行的 Python 脚本,用于对住院就诊记录(patient_visits.parquet)执行缺失值清理,满足以下要求:
使用说明:
代码(保存为 clean_patient_visits.py):
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
清理住院就诊记录缺失值的脚本
功能概要:
- 加载 Parquet 数据
- 统一类型与基础清洗(去重、范围校验、哨兵值处理)
- 按患者-时间顺序对选定列进行前向填充(避免目标泄露)
- 可选 KNN 插补数值列(不含目标、键、日期等)
- 数值列用中位数兜底,类别列用众数兜底
- 可选对指定列执行模型预测插补(RandomForest),特征不包含 readmission_30d
- 可选删除高缺失列(保护 visit_id, patient_id, diagnosis_code, admission_date, discharge_date, readmission_30d)
- 输出:清洗后的数据、清洗摘要(JSON/CSV)、字段字典(JSON/CSV)、生效配置(JSON)、日志
运行示例:
python clean_patient_visits.py \
--input patient_visits.parquet \
--output-dir ./cleaned \
--seed 42 \
--enable-knn \
--n-neighbors 5 \
--ffill-cols BMI,smoker_status \
--drop-threshold 0.95 \
--model-impute-cols "" \
--save-csv 0
注意:
- 脚本不会在任何插补步骤中使用 readmission_30d 作为特征(避免目标泄露)
"""
import argparse
import json
import logging
import os
import sys
import time
import random
from datetime import datetime
import numpy as np
import pandas as pd
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor, RandomForestClassifier
from sklearn.utils.validation import check_is_fitted
# ------------------------------
# 实用函数
# ------------------------------
def setup_logging(output_dir: str, log_level: str = "INFO"):
os.makedirs(output_dir, exist_ok=True)
log_path = os.path.join(output_dir, f"cleaning_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")
logger = logging.getLogger()
logger.setLevel(getattr(logging, log_level.upper(), logging.INFO))
fmt = logging.Formatter("[%(asctime)s] %(levelname)s - %(message)s")
fh = logging.FileHandler(log_path, encoding="utf-8")
fh.setLevel(getattr(logging, log_level.upper(), logging.INFO))
fh.setFormatter(fmt)
logger.addHandler(fh)
sh = logging.StreamHandler(sys.stdout)
sh.setLevel(getattr(logging, log_level.upper(), logging.INFO))
sh.setFormatter(fmt)
logger.addHandler(sh)
logging.info(f"日志输出:{log_path}")
def set_global_seed(seed: int = 42):
np.random.seed(seed)
random.seed(seed)
try:
import torch # noqa
import torch.backends.cudnn as cudnn # noqa
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
cudnn.deterministic = True
cudnn.benchmark = False
except Exception:
pass
def load_parquet(path: str) -> pd.DataFrame:
try:
df = pd.read_parquet(path, engine="pyarrow")
except Exception as e:
logging.warning(f"pyarrow 读取失败,尝试 fastparquet。错误:{e}")
try:
df = pd.read_parquet(path, engine="fastparquet")
except Exception as ee:
logging.error(f"无法读取 Parquet 文件:{ee}")
raise
return df
def to_datetime_safe(df: pd.DataFrame, cols):
for c in cols:
if c in df.columns:
df[c] = pd.to_datetime(df[c], errors="coerce")
return df
def compute_missing_rates(df: pd.DataFrame) -> pd.Series:
return df.isna().mean().sort_values(ascending=False)
def detect_column_roles(df: pd.DataFrame):
# 预定义角色
key_cols = [c for c in ["visit_id", "patient_id"] if c in df.columns]
target_col = "readmission_30d" if "readmission_30d" in df.columns else None
date_cols = [c for c in ["admission_date", "discharge_date"] if c in df.columns]
# 文本列(自由文本)
text_cols = [c for c in df.columns if "free_text" in c.lower() or df[c].dtype == "object" and c.lower().endswith("notes")]
# 诊断列
diag_cols = [c for c in ["diagnosis_code"] if c in df.columns]
# 推断类别列:object 或 pandas Categorical
cat_cols = [c for c in df.columns if c not in key_cols + diag_cols + (date_cols or []) and
(str(df[c].dtype) == "object" or "category" in str(df[c].dtype))]
# 指定一些已知类别列加入(如果未被检测)
for c in ["gender", "smoker_status"]:
if c in df.columns and c not in cat_cols and c not in key_cols:
cat_cols.append(c)
# 数值列
num_cols = [c for c in df.select_dtypes(include=[np.number]).columns if c not in (key_cols + ([target_col] if target_col else []))]
# 保护列(不删除)
protected_cols = list(set(key_cols + diag_cols + date_cols + ([target_col] if target_col else [])))
roles = {
"key_cols": key_cols,
"target_col": target_col,
"date_cols": date_cols,
"diag_cols": diag_cols,
"text_cols": text_cols,
"cat_cols": cat_cols,
"num_cols": num_cols,
"protected_cols": protected_cols,
}
return roles
def enforce_types_and_basic_rules(df: pd.DataFrame):
# 主键为字符串
for c in ["visit_id", "patient_id"]:
if c in df.columns:
df[c] = df[c].astype(str)
# 日期
df = to_datetime_safe(df, [c for c in ["admission_date", "discharge_date"] if c in df.columns])
# 范围规则:age [0, 120],BMI>0,sbp/dbp > 0
if "age" in df.columns:
df.loc[(df["age"] < 0) | (df["age"] > 120), "age"] = np.nan
if "BMI" in df.columns:
df.loc[(df["BMI"] <= 0) | (df["BMI"] > 1000), "BMI"] = np.nan # 上界极大值仅用于屏蔽异常
for bp in ["sbp", "dbp"]:
if bp in df.columns:
df.loc[(df[bp] <= 0) | (df[bp] > 4000), bp] = np.nan # 宽松上界
return df
def apply_lab_sentinel_to_nan(df: pd.DataFrame):
# 对 lab_* 列,将 0 和 -1 视为缺失
lab_cols = [c for c in df.columns if c.startswith("lab_")]
for c in lab_cols:
df.loc[df[c].isin([0, -1]), c] = np.nan
# 显式处理常见实验室列
for c in ["lab_glucose", "lab_creatinine"]:
if c in df.columns:
df.loc[df[c].isin([0, -1]), c] = np.nan
return df
def drop_high_missing_columns(df: pd.DataFrame, threshold: float, protected_cols):
missing = compute_missing_rates(df)
drop_candidates = [c for c, r in missing.items() if r > threshold and c not in protected_cols]
kept_before = df.shape[1]
df = df.drop(columns=drop_candidates, errors="ignore")
logging.info(f"删除高缺失列(阈值 {threshold}):{drop_candidates},删除列数:{kept_before - df.shape[1]}")
return df, drop_candidates
def forward_fill_within_patient(df: pd.DataFrame, group_col: str, order_col: str, cols: list):
cols = [c for c in cols if c in df.columns]
if not cols or group_col not in df.columns or order_col not in df.columns:
return df, []
# 排序后分组前向填充
df = df.sort_values([group_col, order_col])
filled_counts = {}
for c in cols:
before = df[c].isna().sum()
df[c] = df.groupby(group_col, sort=False)[c].ffill()
after = df[c].isna().sum()
filled_counts[c] = int(before - after)
logging.info(f"分组前向填充完成:{filled_counts}")
return df, filled_counts
def knn_impute_numeric(df: pd.DataFrame, num_cols: list, exclude_cols: list, n_neighbors: int = 5, scale: bool = True, seed: int = 42):
use_cols = [c for c in num_cols if c not in exclude_cols and c in df.columns]
if not use_cols:
return df, {}
# 若缺失很少则可能无需 KNN
if df[use_cols].isna().sum().sum() == 0:
logging.info("数值列无缺失,跳过 KNN 插补。")
return df, {c: 0 for c in use_cols}
# KNNImputer 是确定性的(同数据与顺序),设置随机种子以统一其他流程
imputer = KNNImputer(n_neighbors=n_neighbors, weights="uniform", metric="nan_euclidean")
filled_counts = {}
values_before = df[use_cols].isna().sum().to_dict()
if scale:
scaler = StandardScaler(with_mean=True, with_std=True)
X = df[use_cols].values
# fit on non-NaN rows? StandardScaler 能处理 NaN,scikit-learn 1.4+ 会忽略 NaN;为兼容性,分列处理
# 简化实现:对每列手动标准化(忽略 NaN)
X_scaled = np.zeros_like(X, dtype=float)
means = {}
stds = {}
for idx, c in enumerate(use_cols):
col = df[c].astype(float).values
m = np.nanmean(col)
s = np.nanstd(col)
if not np.isfinite(s) or s == 0:
s = 1.0
means[c] = m
stds[c] = s
X_scaled[:, idx] = (col - m) / s
X_imp = imputer.fit_transform(X_scaled)
# 逆缩放
for idx, c in enumerate(use_cols):
X_imp[:, idx] = X_imp[:, idx] * stds[c] + means[c]
df.loc[:, use_cols] = X_imp
else:
df.loc[:, use_cols] = imputer.fit_transform(df[use_cols].values)
values_after = df[use_cols].isna().sum().to_dict()
for c in use_cols:
filled_counts[c] = int(values_before.get(c, 0) - values_after.get(c, 0))
logging.info(f"KNN 插补完成,邻居数={n_neighbors},插补计数:{filled_counts}")
return df, filled_counts
def median_mode_fallback(df: pd.DataFrame, num_cols: list, cat_cols: list):
filled = {}
# 数值中位数兜底
for c in [x for x in num_cols if x in df.columns]:
before = int(df[c].isna().sum())
if before > 0:
med = df[c].median()
df[c] = df[c].fillna(med)
after = int(df[c].isna().sum())
filled[c] = before - after
# 类别众数兜底
for c in [x for x in cat_cols if x in df.columns]:
before = int(df[c].isna().sum())
if before > 0:
try:
mode_val = df[c].mode(dropna=True).iloc[0]
except Exception:
mode_val = None
if mode_val is not None:
df[c] = df[c].fillna(mode_val)
after = int(df[c].isna().sum())
filled[c] = filled.get(c, 0) + (before - after)
logging.info(f"中位数/众数兜底插补完成:{filled}")
return df, filled
def model_impute_columns(df: pd.DataFrame, cols: list, roles: dict, seed: int = 42, n_estimators: int = 200):
cols = [c for c in cols if c in df.columns]
if not cols:
return df, {}
key_cols = roles["key_cols"]
date_cols = roles["date_cols"]
target_col = roles["target_col"]
diag_cols = roles["diag_cols"]
protected_exclude = list(set((key_cols or []) + (date_cols or []) + (diag_cols or []) + ([target_col] if target_col else [])))
# 特征候选:除去待插补列本身、保护列
feats_all = [c for c in df.columns if c not in protected_exclude]
filled_info = {}
for col in cols:
if col not in df.columns:
continue
# 记录原始缺失掩码(在之前流程之后,可能已做了兜底;但这里仅覆盖“原始缺失”的位置)
original_missing_mask = df[col].isna()
# 如果此前已经完全无缺失,不需要模型插补
if original_missing_mask.sum() == 0:
filled_info[col] = 0
continue
# 构建特征列表:移除目标列本身
features = [c for c in feats_all if c != col and c in df.columns]
# 将日期列剔除(已在 protected_exclude)
# 进一步剔除纯文本列(无法直接编码;若有 free_text_notes,按上游阈值可能已删除)
features = [c for c in features if not (df[c].dtype == "object" and c.lower().endswith("notes"))]
# 拆分特征类型
num_feats = [c for c in features if pd.api.types.is_numeric_dtype(df[c])]
cat_feats = [c for c in features if (str(df[c].dtype) == "object" or "category" in str(df[c].dtype))]
# 训练/预测数据
not_missing_idx = (~original_missing_mask).values
missing_idx = original_missing_mask.values
X_train = df.loc[not_missing_idx, features]
X_pred = df.loc[missing_idx, features]
y_train = df.loc[not_missing_idx, col]
if y_train.isna().sum() > 0:
# 若训练标签仍有 NaN,跳过该列
logging.warning(f"列 {col} 的训练标签仍有 NaN,跳过模型插补。")
filled_info[col] = 0
continue
# 构建预处理与模型
num_transformer = Pipeline(steps=[
("imputer", SimpleImputer(strategy="median"))
])
cat_transformer = Pipeline(steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(handle_unknown="ignore", sparse_output=False))
])
preprocessor = ColumnTransformer(
transformers=[
("num", num_transformer, num_feats),
("cat", cat_transformer, cat_feats)
],
remainder="drop"
)
if pd.api.types.is_numeric_dtype(df[col]):
estimator = RandomForestRegressor(
n_estimators=n_estimators,
random_state=seed,
n_jobs=-1
)
else:
# 将 y_train 转为类别
y_train = y_train.astype(str)
estimator = RandomForestClassifier(
n_estimators=n_estimators,
random_state=seed,
n_jobs=-1,
class_weight="balanced"
)
pipe = Pipeline(steps=[
("pre", preprocessor),
("model", estimator)
])
try:
pipe.fit(X_train, y_train)
preds = pipe.predict(X_pred)
# 回写预测值
df.loc[missing_idx, col] = preds
filled_info[col] = int(missing_idx.sum())
logging.info(f"模型插补完成:列 {col},填充 {filled_info[col]} 个缺失值。")
except Exception as e:
logging.error(f"模型插补失败:列 {col},错误: {e}")
filled_info[col] = 0
return df, filled_info
def build_data_dictionary(df_before: pd.DataFrame,
df_after: pd.DataFrame,
roles: dict,
strategy_map: dict) -> pd.DataFrame:
d = []
before_missing = compute_missing_rates(df_before).to_dict()
after_missing = compute_missing_rates(df_after).to_dict()
for c in df_after.columns:
role = []
if c in roles["key_cols"]:
role.append("key")
if roles["target_col"] and c == roles["target_col"]:
role.append("target")
if c in roles["date_cols"]:
role.append("date")
if c in roles["diag_cols"]:
role.append("diagnosis")
if c in roles["cat_cols"]:
role.append("categorical")
if c in roles["num_cols"]:
role.append("numeric")
if c in roles["text_cols"]:
role.append("text")
entry = {
"column": c,
"dtype": str(df_after[c].dtype),
"role": ",".join(role) if role else "other",
"n_unique": int(df_after[c].nunique(dropna=True)),
"missing_rate_before": float(before_missing.get(c, np.nan)),
"missing_rate_after": float(after_missing.get(c, np.nan)),
"impute_strategy": strategy_map.get(c, "none"),
"notes": ""
}
d.append(entry)
return pd.DataFrame(d)
def save_summary(output_dir: str,
df_clean: pd.DataFrame,
cols_dropped: list,
ffill_counts: dict,
knn_counts: dict,
fallback_counts: dict,
model_counts: dict,
missing_before: pd.Series,
missing_after: pd.Series,
config: dict,
dictionary_df: pd.DataFrame):
os.makedirs(output_dir, exist_ok=True)
# 清洗摘要
summary = {
"timestamp": datetime.now().isoformat(),
"n_rows": int(df_clean.shape[0]),
"n_cols": int(df_clean.shape[1]),
"columns_dropped": cols_dropped,
"ffill_counts": ffill_counts,
"knn_counts": knn_counts,
"fallback_counts": fallback_counts,
"model_counts": model_counts,
"missing_rate_before": missing_before.to_dict(),
"missing_rate_after": missing_after.to_dict(),
"config_used": config
}
with open(os.path.join(output_dir, "cleaning_summary.json"), "w", encoding="utf-8") as f:
json.dump(summary, f, ensure_ascii=False, indent=2)
# 另存 CSV 形式的缺失率前/后
missing_df = pd.DataFrame({
"column": missing_before.index,
"missing_rate_before": missing_before.values,
"missing_rate_after": missing_after.reindex(missing_before.index).values
})
missing_df.to_csv(os.path.join(output_dir, "missing_rates_before_after.csv"), index=False, encoding="utf-8")
# 字段字典
dictionary_df.to_csv(os.path.join(output_dir, "data_dictionary.csv"), index=False, encoding="utf-8")
dictionary_df.to_json(os.path.join(output_dir, "data_dictionary.json"), orient="records", force_ascii=False, indent=2)
# 保存配置
with open(os.path.join(output_dir, "cleaning_config_used.json"), "w", encoding="utf-8") as f:
json.dump(config, f, ensure_ascii=False, indent=2)
logging.info(f"清洗摘要与字段字典已保存到 {output_dir}")
# ------------------------------
# 主流程
# ------------------------------
def main():
parser = argparse.ArgumentParser(description="住院就诊记录缺失值清理脚本")
parser.add_argument("--input", type=str, required=True, help="输入 Parquet 文件路径(patient_visits.parquet)")
parser.add_argument("--output-dir", type=str, required=True, help="输出目录")
parser.add_argument("--seed", type=int, default=42, help="随机种子")
parser.add_argument("--log-level", type=str, default="INFO", help="日志级别(INFO/DEBUG/WARN/ERROR)")
# 删除高缺失列
parser.add_argument("--drop-threshold", type=float, default=0.95, help="按缺失率删除列的阈值,保护列除外")
parser.add_argument("--disable-drop", action="store_true", help="禁用按阈值删除高缺失列")
# 前向填充
parser.add_argument("--ffill-cols", type=str, default="BMI,smoker_status",
help="逗号分隔的列名列表,在同一 patient_id 内按 admission_date 前向填充;为空字符串表示不执行")
parser.add_argument("--disable-ffill", action="store_true", help="禁用前向填充")
# KNN 插补
parser.add_argument("--enable-knn", action="store_true", help="启用 KNN 插补数值列")
parser.add_argument("--n-neighbors", type=int, default=5, help="KNNImputer 的邻居数")
# 模型插补
parser.add_argument("--model-impute-cols", type=str, default="",
help="逗号分隔的列名列表,对这些列使用随机森林模型预测插补;为空表示不启用")
parser.add_argument("--n-estimators", type=int, default=200, help="随机森林基学习器数量(模型插补)")
# 输出控制
parser.add_argument("--save-csv", type=int, default=0, help="是否另存 CSV(1 保存,0 不保存)")
args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
setup_logging(args.output_dir, args.log_level)
set_global_seed(args.seed)
t0 = time.time()
logging.info("开始加载数据...")
df = load_parquet(args.input)
n_rows, n_cols = df.shape
logging.info(f"数据加载完成:{n_rows} 行,{n_cols} 列")
# 复制用于统计
df_before = df.copy(deep=True)
# 去重(按 visit_id)
if "visit_id" in df.columns:
dup_count = int(df.duplicated(subset=["visit_id"]).sum())
if dup_count > 0:
df = df.drop_duplicates(subset=["visit_id"], keep="first")
logging.info(f"删除重复 visit_id 记录:{dup_count}")
# 基础类型与规则
df = enforce_types_and_basic_rules(df)
df = apply_lab_sentinel_to_nan(df)
# 角色探测
roles = detect_column_roles(df)
logging.info(f"列角色探测结果:{ {k: (v if isinstance(v, str) else len(v)) for k, v in roles.items()} }")
# 删除高缺失列(保护列除外)
cols_dropped = []
if not args.disable_drop:
df, cols_dropped = drop_high_missing_columns(df, threshold=args.drop_threshold, protected_cols=roles["protected_cols"])
# 前向填充(按 patient_id + admission_date)
ffill_counts = {}
if not args.disable_ffill:
ffill_cols = [c.strip() for c in args.ffill_cols.split(",") if c.strip()]
df, ffill_counts = forward_fill_within_patient(
df=df,
group_col="patient_id" if "patient_id" in df.columns else None,
order_col="admission_date" if "admission_date" in df.columns else None,
cols=ffill_cols
)
missing_before = compute_missing_rates(df)
# KNN 插补(仅数值列;排除键、目标、日期)
knn_counts = {}
if args.enable_knn:
exclude_for_knn = roles["protected_cols"]
df, knn_counts = knn_impute_numeric(
df=df,
num_cols=roles["num_cols"],
exclude_cols=exclude_for_knn,
n_neighbors=args.n_neighbors,
scale=True,
seed=args.seed
)
# 兜底:数值中位数、类别众数
fallback_counts = {}
df, fallback_counts = median_mode_fallback(df, roles["num_cols"], roles["cat_cols"])
# 模型插补(对指定列,覆盖原始缺失位置)
model_counts = {}
model_cols = [c.strip() for c in args.model_impute_cols.split(",") if c.strip()]
if len(model_cols) > 0:
df, model_counts = model_impute_columns(df, model_cols, roles, seed=args.seed, n_estimators=args.n_estimators)
missing_after = compute_missing_rates(df)
# 构建策略映射(字段字典用)
strategy_map = {}
# 标记 KNN 使用的列
for c, cnt in knn_counts.items():
if cnt > 0:
strategy_map[c] = "KNN"
# 标记前向填充列
for c, cnt in ffill_counts.items():
if cnt > 0:
strategy_map[c] = "forward_fill" if c not in strategy_map else strategy_map[c] + "+forward_fill"
# 标记模型插补列
for c, cnt in model_counts.items():
if cnt > 0:
strategy_map[c] = "model" if c not in strategy_map else strategy_map[c] + "+model"
# 对未被上述捕获但有兜底的列
for c, cnt in fallback_counts.items():
if cnt > 0 and c not in strategy_map:
if c in roles["num_cols"]:
strategy_map[c] = "median"
elif c in roles["cat_cols"]:
strategy_map[c] = "mode"
# 输出
cleaned_parquet = os.path.join(args.output_dir, "patient_visits_cleaned.parquet")
df.to_parquet(cleaned_parquet, index=False)
if args.save_csv == 1:
cleaned_csv = os.path.join(args.output_dir, "patient_visits_cleaned.csv")
df.to_csv(cleaned_csv, index=False, encoding="utf-8")
# 字段字典
dictionary_df = build_data_dictionary(df_before, df, roles, strategy_map)
# 保存摘要、字典与配置
effective_config = {
"input": args.input,
"output_dir": args.output_dir,
"seed": args.seed,
"drop_threshold": args.drop_threshold,
"disable_drop": bool(args.disable_drop),
"ffill_cols": [c.strip() for c in args.ffill_cols.split(",") if c.strip()],
"disable_ffill": bool(args.disable_ffill),
"enable_knn": bool(args.enable_knn),
"n_neighbors": args.n_neighbors,
"model_impute_cols": model_cols,
"n_estimators": args.n_estimators,
"save_csv": args.save_csv,
"protected_cols": roles["protected_cols"]
}
save_summary(
output_dir=args.output_dir,
df_clean=df,
cols_dropped=cols_dropped,
ffill_counts=ffill_counts,
knn_counts=knn_counts,
fallback_counts=fallback_counts,
model_counts=model_counts,
missing_before=compute_missing_rates(df_before),
missing_after=missing_after,
config=effective_config,
dictionary_df=dictionary_df
)
elapsed = time.time() - t0
logging.info(f"清洗完成,耗时 {elapsed:.2f} 秒。输出文件位于:{args.output_dir}")
if __name__ == "__main__":
main()
要点说明:
如需仅使用中位数/众数兜底插补并跳过 KNN 与模型插补,禁用相应选项: python clean_patient_visits.py --input patient_visits.parquet --output-dir ./cleaned --seed 42 --disable-ffill --disable-drop --save-csv 0
如需对 BMI 使用模型预测插补(在兜底后覆盖原始缺失位置): python clean_patient_visits.py --input patient_visits.parquet --output-dir ./cleaned --seed 42 --model-impute-cols BMI --n-estimators 300 --enable-knn
备注:KNN 插补在 350k×若干数值特征时计算量较大;可通过 --n-neighbors 调整或禁用以加速。
以下为可直接执行的 Python 脚本,按照设备维度并行处理、严格按时间顺序、在限定插补窗口内进行插值,支持时间索引对齐,并生成详细日志和质量报告。脚本含中英双语注释。
使用说明:
脚本内容:
#!/usr/bin/env python3
""" 设备遥测缺失值清理脚本 / Telemetry Missing-Value Cleaning Script
功能概述 / Overview:
目标字段与缺失特征 / Fields and Missingness:
业务约束 / Business Constraints:
输出 / Outputs:
"""
import argparse import json import logging import os from dataclasses import dataclass, asdict from datetime import datetime from typing import Dict, Any, Tuple, List
import numpy as np import pandas as pd
@dataclass class CleaningConfig: # 插值最大缺口(以分钟计)/ Max consecutive interpolation window (minutes) max_gap_temp: int = 5 max_gap_vibration: int = 5 max_gap_rpm: int = 3 max_gap_humidity: int = 5 max_gap_power: int = 2
# 边界有限前后向填充窗口(以分钟计)/ Edge limited ffill/bfill window (minutes)
edge_fill_limit: int = 1
# 电量固定填充值(表示N/A)/ Fixed fill for battery_level (N/A)
battery_level_fill: float = -1.0
# 类别缺失标签 / Categorical unknown label
unknown_label: str = "UNKNOWN"
# 是否对齐到每分钟索引 / Align to 1-minute index
align_to_minutely: bool = False
# 允许设备内排序(违背业务约束,默认禁止)/ Allow sorting within device (default False per constraint)
allow_sort_within_device: bool = False
# 是否在最终阶段丢弃仍含关键字段缺失的行 / Drop rows with remaining NA in critical fields
drop_rows_with_any_na: bool = False
# 关键数值字段 / Critical numeric fields
critical_numeric_fields: Tuple[str, ...] = (
"temp_c", "vibration_rms", "rpm"
)
# 输出是否按 device_id 分区存储 / Partition parquet by device_id
partition_output: bool = True
def setup_logging(output_dir: str) -> None: os.makedirs(output_dir, exist_ok=True) log_path = os.path.join(output_dir, "cleaning.log") logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(processName)s - %(message)s", handlers=[ logging.FileHandler(log_path, mode="w", encoding="utf-8"), logging.StreamHandler() ] ) logging.info("Log file initialized at %s", log_path)
def _na_run_lengths(s: pd.Series) -> List[int]: """ 计算连续缺失段长度(用于统计,不写回)/ Compute consecutive NA run lengths (for stats) """ is_na = s.isna().to_numpy() if not is_na.any(): return [] # Find run lengths runs = [] count = 0 for v in is_na: if v: count += 1 else: if count > 0: runs.append(count) count = 0 if count > 0: runs.append(count) return runs
def _interpolate_limited( s: pd.Series, limit: int, method: str = "time", limit_area: str = "inside" ) -> Tuple[pd.Series, int]: """ 限制窗口插值 / Interpolation with window limit 返回:插值后的序列、被填充的数量 Return: interpolated series and count of filled values """ before_na = s.isna().sum() if isinstance(s.index, pd.DatetimeIndex) and method == "time": out = s.interpolate(method="time", limit=limit, limit_area=limit_area) else: out = s.interpolate(method="linear", limit=limit, limit_area=limit_area) after_na = out.isna().sum() filled = int(before_na - after_na) return out, filled
def _ffill_bfill_limited(s: pd.Series, limit: int) -> Tuple[pd.Series, int, int]: """ 有限窗口前向/后向填充 / Limited ffill/bfill 返回:新序列、ffill填充数、bfill填充数 """ na_before = s.isna().sum() s1 = s.ffill(limit=limit) ffilled = na_before - s1.isna().sum() s2 = s1.bfill(limit=limit) bfilled = s1.isna().sum() - s2.isna().sum() return s2, int(ffilled), int(bfilled)
def _normalize_status(s: pd.Series, unknown_label: str) -> Tuple[pd.Series, Dict[str, int]]: """ 规范化状态字段:限制到 {OK, WARN, FAIL, UNKNOWN} Normalize status values to {OK, WARN, FAIL, UNKNOWN} with limited ffill/bfill strategy externally. """ allowed = {"OK", "WARN", "FAIL"} stats = {"invalid_to_unknown": 0} s_clean = s.copy() mask_invalid = ~s_clean.isna() & ~s_clean.astype(str).isin(allowed) stats["invalid_to_unknown"] = int(mask_invalid.sum()) s_clean.loc[mask_invalid] = unknown_label return s_clean, stats
def ensure_time_index(df: pd.DataFrame, allow_sort: bool) -> Tuple[pd.DataFrame, bool]: """ 确保时间索引且序列单调上升 / Ensure datetime index and monotonic increasing 返回:DataFrame,是否单调 """ df = df.copy() if "timestamp" not in df.columns: raise ValueError("timestamp column missing.") # 删除缺失时间戳的行(无法排序与对齐)/ Drop rows with missing timestamps drop_rows = df["timestamp"].isna().sum() if drop_rows > 0: df = df.loc[~df["timestamp"].isna()].copy()
df["timestamp"] = pd.to_datetime(df["timestamp"], errors="coerce", utc=False)
df = df.loc[~df["timestamp"].isna()].copy()
# 不重排:仅在已单调情况下设置索引;如不单调且不允许排序,则返回不单调标记
# No reordering: if not monotonic and sorting not allowed, return monotonic=False
is_monotonic = df["timestamp"].is_monotonic_increasing
if allow_sort:
# 业务约束默认不允许,只有明确允许时才排序
df = df.sort_values("timestamp", kind="stable")
is_monotonic = True
df = df.set_index("timestamp", drop=True)
return df, is_monotonic
def align_minutely(df: pd.DataFrame) -> Tuple[pd.DataFrame, int]: """ 对齐到每分钟索引(闭区间)/ Align to 1-minute frequency (inclusive) 返回:新DF与新增行数 / new df and number of inserted rows """ if df.empty: return df, 0 start, end = df.index.min(), df.index.max() full_idx = pd.date_range(start=start, end=end, freq="T") before = len(df) out = df.reindex(full_idx) inserted = len(out) - before return out, int(inserted)
def process_device( device_id: Any, df_device: pd.DataFrame, cfg: CleaningConfig ) -> Tuple[pd.DataFrame, Dict[str, Any]]: """ 处理单个设备的缺失值清理 / Clean missing values for a single device """ metrics: Dict[str, Any] = { "device_id": device_id, "rows_in": int(len(df_device)), "rows_timestamp_missing_dropped": int(df_device["timestamp"].isna().sum()), "inserted_minute_rows": 0, "monotonic_time": True, "interpolate_filled": {}, "ffill_filled": {}, "bfill_filled": {}, "fixed_filled": {}, "categorical_unknown_filled": {}, "long_gaps_skipped": {}, "rows_out": 0, "post_na_counts": {}, "pre_na_counts": df_device.isna().sum().to_dict(), }
# 确保时间索引与单调性 / Ensure time index and monotonicity
df_ts, is_monotonic = ensure_time_index(df_device, allow_sort=cfg.allow_sort_within_device)
metrics["monotonic_time"] = bool(is_monotonic)
if not is_monotonic:
logging.warning("Device %s has non-monotonic timestamps; interpolation will be skipped; limited ffill/bfill only.", device_id)
# 可选:对齐到每分钟索引 / Optional: align to minutely index
if cfg.align_to_minutely:
df_ts, inserted = align_minutely(df_ts)
metrics["inserted_minute_rows"] = int(inserted)
# 回填 device_id / Restore device_id
df_ts["device_id"] = device_id
# 统计缺口分布(插值前)/ Gap runs before interpolation
gap_stats = {}
targets = {
"temp_c": cfg.max_gap_temp,
"vibration_rms": cfg.max_gap_vibration,
"rpm": cfg.max_gap_rpm,
"humidity": cfg.max_gap_humidity,
"power_w": cfg.max_gap_power,
}
for col, lim in targets.items():
if col in df_ts.columns:
runs = _na_run_lengths(df_ts[col])
gap_stats[col] = {
"num_na_runs": int(len(runs)),
"num_runs_gt_limit": int(sum(1 for r in runs if r > lim)),
}
else:
gap_stats[col] = {"num_na_runs": 0, "num_runs_gt_limit": 0}
metrics["long_gaps_skipped"] = {k: v["num_runs_gt_limit"] for k, v in gap_stats.items()}
# 数值列插值 / Interpolation for numeric fields
interp_filled_counts = {}
ffill_counts = {}
bfill_counts = {}
def do_interpolate(col: str, limit: int) -> None:
if col not in df_ts.columns:
return
s = df_ts[col]
if s.dtype.kind not in ("i", "u", "f", "b"): # 非数值跳过
return
if not metrics["monotonic_time"]:
# 非单调:跳过插值,仅做有限FFill/BFill
s2, ffc, bfc = _ffill_bfill_limited(s, cfg.edge_fill_limit)
df_ts[col] = s2
ffill_counts[col] = ffill_counts.get(col, 0) + ffc
bfill_counts[col] = bfill_counts.get(col, 0) + bfc
return
# 先限定窗口的时间/线性插值(禁止跨越长缺口与边界)/ Limited interpolation (no edges)
s2, filled = _interpolate_limited(
s, limit=limit, method="time", limit_area="inside"
)
interp_filled_counts[col] = interp_filled_counts.get(col, 0) + filled
# 边界再有限度前后向填充 / Edge-limited ffill/bfill
s3, ffc, bfc = _ffill_bfill_limited(s2, cfg.edge_fill_limit)
df_ts[col] = s3
ffill_counts[col] = ffill_counts.get(col, 0) + ffc
bfill_counts[col] = bfill_counts.get(col, 0) + bfc
# 针对目标列执行插值 / Apply interpolation to target columns
for col, lim in targets.items():
do_interpolate(col, lim)
# battery_level:固定值填充 + 指示列 / battery_level: fixed fill + indicator
fixed_counts = {}
if "battery_level" in df_ts.columns:
mask_batt_na = df_ts["battery_level"].isna()
fixed_counts["battery_level"] = int(mask_batt_na.sum())
if fixed_counts["battery_level"] > 0:
df_ts.loc[mask_batt_na, "battery_level"] = cfg.battery_level_fill
# 缺失标记 / Missing indicator
df_ts["battery_level_missing"] = mask_batt_na.astype(np.int8)
# status:有限FFill/BFill + UNKNOWN / status with limited ffill/bfill + UNKNOWN
cat_unknown_counts = {}
if "status" in df_ts.columns:
# 规范化异常值 / Normalize invalid values
s, st_stats = _normalize_status(df_ts["status"], cfg.unknown_label)
# 有限前后向填充(仅1)/ limited ffill/bfill
s2, ffc, bfc = _ffill_bfill_limited(s, cfg.edge_fill_limit)
s2 = s2.fillna(cfg.unknown_label)
df_ts["status"] = s2.astype("category")
cat_unknown_counts["status_invalid_to_unknown"] = st_stats["invalid_to_unknown"]
cat_unknown_counts["status_ffill"] = ffc
cat_unknown_counts["status_bfill"] = bfc
cat_unknown_counts["status_final_unknown"] = int((s2 == cfg.unknown_label).sum())
# 其他类别字段:site/firmware_ver -> UNKNOWN / Other categoricals to UNKNOWN
for col in ("site", "firmware_ver"):
if col in df_ts.columns:
na_cnt = int(df_ts[col].isna().sum())
if na_cnt > 0:
df_ts[col] = df_ts[col].astype("string")
df_ts[col] = df_ts[col].fillna(cfg.unknown_label)
cat_unknown_counts[col] = na_cnt
# 最终丢弃策略(可选)/ Optional final drop of rows with NA in critical numeric fields
rows_before_drop = len(df_ts)
if cfg.drop_rows_with_any_na:
mask_any_na = df_ts[list(cfg.critical_numeric_fields)].isna().any(axis=1)
drop_cnt = int(mask_any_na.sum())
if drop_cnt > 0:
df_ts = df_ts.loc[~mask_any_na].copy()
metrics["rows_dropped_final_na"] = drop_cnt
else:
metrics["rows_dropped_final_na"] = 0
# 指标汇总 / Metrics aggregation
metrics["interpolate_filled"] = interp_filled_counts
metrics["ffill_filled"] = ffill_counts
metrics["bfill_filled"] = bfill_counts
metrics["fixed_filled"] = fixed_counts
metrics["categorical_unknown_filled"] = cat_unknown_counts
metrics["rows_out"] = int(len(df_ts))
metrics["post_na_counts"] = df_ts.isna().sum().to_dict()
# 确保列顺序稳定(不重排记录,仅输出整理)/ Keep stable column order for output
if "timestamp" not in df_ts.columns:
# timestamp 现在是索引 / it's the index
df_ts = df_ts.reset_index().rename(columns={"index": "timestamp"})
return df_ts, metrics
def main(): parser = argparse.ArgumentParser(description="Telemetry missing value cleaning / 设备遥测缺失值清理") parser.add_argument("--input", required=True, help="Input Parquet path / 输入 Parquet 路径") parser.add_argument("--output_dir", required=True, help="Output directory / 输出目录") parser.add_argument("--num_workers", type=int, default=1, help="Parallel workers by device / 并行进程数(按设备分区)") parser.add_argument("--align_to_minutely", action="store_true", help="Align time index to 1-minute / 对齐到每分钟索引") parser.add_argument("--allow_sort_within_device", action="store_true", help="Allow sorting within device (not recommended) / 允许设备内排序(不建议)") parser.add_argument("--drop_rows_with_any_na", action="store_true", help="Drop rows with remaining NA in critical fields / 丢弃仍含关键缺失的行") args = parser.parse_args()
os.makedirs(args.output_dir, exist_ok=True)
setup_logging(args.output_dir)
cfg = CleaningConfig(
align_to_minutely=args.align_to_minutely,
allow_sort_within_device=args.allow_sort_within_device,
drop_rows_with_any_na=args.drop_rows_with_any_na,
)
logging.info("Cleaning configuration / 清理配置: %s", json.dumps(asdict(cfg), ensure_ascii=False))
logging.info("Interpolation window limits / 插补窗口限制: temp<=%d, vibration<=%d, rpm<=%d, humidity<=%d, power<=%d; edge ffill/bfill<=%d",
cfg.max_gap_temp, cfg.max_gap_vibration, cfg.max_gap_rpm, cfg.max_gap_humidity, cfg.max_gap_power, cfg.edge_fill_limit)
logging.info("Strategies used / 策略: [使用插值法填充, 使用前向填充, 使用后向填充, 类别型字段用‘未知’标记填充, 数值型字段用固定值填充, 删除含缺失值的行(仅限timestamp或可选最终阶段)]")
# 读取数据 / Read parquet
logging.info("Reading input parquet / 读取Parquet: %s", args.input)
df = pd.read_parquet(args.input)
expected_cols = {"device_id", "timestamp"}
missing_essential = expected_cols - set(df.columns)
if missing_essential:
raise ValueError(f"Missing essential columns: {missing_essential}")
# 确保原始行顺序不变(不进行全局排序)/ Do not reorder globally
total_rows = len(df)
logging.info("Rows total: %d, Columns: %d", total_rows, len(df.columns))
# 按设备分区 / Partition by device
device_ids = df["device_id"].dropna().unique().tolist()
logging.info("Unique devices: %d", len(device_ids))
# 组装每个设备的子表(保持原顺序)/ Build per-device DataFrames (preserve original order)
groups = {dev: df.loc[df["device_id"] == dev].copy() for dev in device_ids}
# 并行处理 / Parallel processing
results: List[pd.DataFrame] = []
metrics_all: List[Dict[str, Any]] = []
if args.num_workers and args.num_workers > 1:
import multiprocessing as mp
with mp.get_context("spawn").Pool(processes=args.num_workers) as pool:
jobs = []
for dev, gdf in groups.items():
jobs.append(pool.apply_async(process_device, (dev, gdf, cfg)))
for j in jobs:
cleaned_df, met = j.get()
results.append(cleaned_df)
metrics_all.append(met)
else:
for dev, gdf in groups.items():
cleaned_df, met = process_device(dev, gdf, cfg)
results.append(cleaned_df)
metrics_all.append(met)
# 合并结果 / Concatenate results
cleaned = pd.concat(results, ignore_index=True)
# 写出结果 / Write outputs
if cfg.partition_output:
# 分区写出 / Partition by device_id
out_root = os.path.join(args.output_dir, "parquet_by_device")
os.makedirs(out_root, exist_ok=True)
# 使用 pyarrow 分区写出(pandas>=1.4 支持 partition_cols)/ partitioned write
cleaned.to_parquet(
out_root,
engine="pyarrow",
index=False,
partition_cols=["device_id"]
)
logging.info("Partitioned parquet written to: %s", out_root)
else:
out_path = os.path.join(args.output_dir, "telemetry_cleaned.parquet")
cleaned.to_parquet(out_path, engine="pyarrow", index=False)
logging.info("Parquet written to: %s", out_path)
# 质量报告 / Quality report
# 聚合统计 / Aggregate metrics
agg = {
"generated_at": datetime.utcnow().isoformat() + "Z",
"total_devices": len(device_ids),
"rows_in_total": int(sum(m["rows_in"] for m in metrics_all)),
"rows_out_total": int(sum(m["rows_out"] for m in metrics_all)),
"rows_timestamp_missing_dropped_total": int(sum(m["rows_timestamp_missing_dropped"] for m in metrics_all)),
"inserted_minute_rows_total": int(sum(m["inserted_minute_rows"] for m in metrics_all)),
"devices_non_monotonic": int(sum(1 for m in metrics_all if not m["monotonic_time"])),
"interpolate_filled_total": {},
"ffill_filled_total": {},
"bfill_filled_total": {},
"fixed_filled_total": {},
"categorical_unknown_filled_total": {},
"post_na_counts_total": {},
}
# 累加器 / reducers
def add_dict(acc: Dict[str, int], x: Dict[str, int]):
for k, v in x.items():
acc[k] = acc.get(k, 0) + int(v)
for m in metrics_all:
add_dict(agg["interpolate_filled_total"], m.get("interpolate_filled", {}))
add_dict(agg["ffill_filled_total"], m.get("ffill_filled", {}))
add_dict(agg["bfill_filled_total"], m.get("bfill_filled", {}))
add_dict(agg["fixed_filled_total"], m.get("fixed_filled", {}))
add_dict(agg["categorical_unknown_filled_total"], m.get("categorical_unknown_filled", {}))
add_dict(agg["post_na_counts_total"], m.get("post_na_counts", {}))
# 保存报告 / Save reports
report_path = os.path.join(args.output_dir, "quality_report.json")
with open(report_path, "w", encoding="utf-8") as f:
json.dump({"config": asdict(cfg), "aggregate": agg, "per_device": metrics_all}, f, ensure_ascii=False, indent=2)
logging.info("Quality report written to: %s", report_path)
logging.info("Done. Rows in: %d, Rows out: %d", agg["rows_in_total"], agg["rows_out_total"])
if name == "main": main()
实现细节与策略说明:
如需根据不同设备类型(有线/电池)动态调整 battery_level 策略,可在 process_device 内加入基于电源特征(如 power_w 稳定>0 且 battery_level 全缺失)来切换为完全置空+指示列或保持 -1 语义。
以“缺失值清洗”为切入点,为数据分析、算法与BI团队提供一条从需求到可运行脚本的快速通道:让AI扮演数据挖掘专家,基于你提供的数据集特征与业务约束,稳定产出结构清晰、可直接嵌入流水线的Python清洗脚本;统一处理口径,降低人为差错,缩短数据准备周期,提升模型训练与报表输出的准确性,并支持多语言回应以便跨团队协作和交付。
为电商、金融或运营数据一键生成缺失值清洗脚本,快速试用多种填补方案,沉淀可复用模板,提升建模与报表的可信度。
在入库前完成缺失率审计与自动填补,输出对照表和异常清单,保障看板口径一致,减少手工修补时间。
通过带注释的脚本理解常见填补方法,安全试运行并查看影响,快速从入门到上手项目实战。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期