编写Python脚本,对特定数据集进行标准化处理。
以下脚本针对约10万行的通用表格数据(含数值/分类型/日期字段、缺失值与异常值)进行标准化与统一编码。核心处理包括: - 数值字段:缺失值中位数填充、IQR异常值裁剪、标准化(StandardScaler)。 - 分类型字段:文本归一化(Unicode NFKC、去空格/大小写)、低频类别折叠(避免维度爆炸)、一致的One-Hot编码(handle_unknown=ignore)。 - 日期字段:自动识别并解析为datetime、缺失值中位数填充、派生数值特征(年/月/日/星期等)、标准化。 - 统一编码:输出UTF-8,字符串统一规范化,避免不同编码导致的类别不一致。 - 可复用:以scikit-learn的Pipeline/ColumnTransformer组织,保证训练-推理一致性;输出处理后的数据与处理器。 使用方法(命令行示例): - 安装依赖:pip install pandas scikit-learn joblib pyarrow - 运行:python preprocess_standardize.py --input data.csv --output processed.parquet --sep , --encoding utf-8 脚本 preprocess_standardize.py: ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- import argparse import os import re import unicodedata import warnings from typing import List, Dict, Tuple import numpy as np import pandas as pd from joblib import dump from sklearn.base import BaseEstimator, TransformerMixin from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler warnings.filterwarnings("ignore", category=FutureWarning) # ===================== 工具函数 ===================== def normalize_text(s: str) -> str: """统一文本编码与格式:Unicode NFKC,去两端空白,压缩中间空白为单个空格,小写。 保留原始内容的语义,不做翻译或映射。 """ if s is None: return s # 规范化 s = unicodedata.normalize("NFKC", str(s)) # 去两端空白 s = s.strip() # 压缩多个空白 s = re.sub(r"\s+", " ", s) # 小写 s = s.lower() return s def is_date_like_series(series: pd.Series, sample_size: int = 5000, threshold: float = 0.8) -> bool: """基于抽样的启发式日期识别:尝试 to_datetime,成功率 >= threshold 认为是日期字段。""" s = series.dropna() if s.empty: return False n = min(len(s), sample_size) sample = s.sample(n, random_state=42) if len(s) > n else s parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True) success_rate = parsed.notna().mean() return success_rate >= threshold def infer_column_types(df: pd.DataFrame) -> Tuple[List[str], List[str], List[str]]: """推断数值、日期、分类型列。""" numeric_cols = df.select_dtypes(include=["number"]).columns.tolist() # 候选对象列用于日期与分类型识别 obj_cols = df.select_dtypes(include=["object"]).columns.tolist() bool_cols = df.select_dtypes(include=["bool"]).columns.tolist() cat_dtype_cols = [c for c in df.columns if str(df[c].dtype) == "category"] # 先识别日期列 date_cols = [] for c in obj_cols: try: if is_date_like_series(df[c]): date_cols.append(c) except Exception: continue # 剩余对象列作为分类型 categorical_cols = [c for c in obj_cols if c not in date_cols] + bool_cols + cat_dtype_cols # 去重与顺序 categorical_cols = list(dict.fromkeys(categorical_cols)) date_cols = list(dict.fromkeys(date_cols)) numeric_cols = list(dict.fromkeys(numeric_cols)) # 排除重叠 numeric_cols = [c for c in numeric_cols if c not in date_cols] categorical_cols = [c for c in categorical_cols if c not in numeric_cols and c not in date_cols] return numeric_cols, date_cols, categorical_cols # ===================== 自定义变换器 ===================== class NumericOutlierClipperImputer(BaseEstimator, TransformerMixin): """数值列:IQR裁剪异常值并用中位数填充缺失。""" def __init__(self, columns: List[str], iqr_k: float = 1.5): self.columns = columns self.iqr_k = iqr_k self.bounds_: Dict[str, Tuple[float, float]] = {} self.medians_: Dict[str, float] = {} def fit(self, X: pd.DataFrame, y=None): for c in self.columns: col = pd.to_numeric(X[c], errors="coerce") q1 = col.quantile(0.25) q3 = col.quantile(0.75) iqr = q3 - q1 lower = q1 - self.iqr_k * iqr upper = q3 + self.iqr_k * iqr self.bounds_[c] = (lower, upper) self.medians_[c] = col.median() return self def transform(self, X: pd.DataFrame): X_out = X.copy() for c in self.columns: col = pd.to_numeric(X_out[c], errors="coerce") lower, upper = self.bounds_[c] col = col.clip(lower=lower, upper=upper) col = col.fillna(self.medians_[c]) X_out[c] = col return X_out[self.columns] class RareCategoryGrouper(BaseEstimator, TransformerMixin): """分类型列:将低频类别折叠为 '__rare__'(避免OHE维度爆炸)。 可按比例 min_freq 或绝对阈值 min_count 控制。 """ def __init__(self, columns: List[str], min_freq: float = 0.01, min_count: int = None): self.columns = columns self.min_freq = min_freq self.min_count = min_count self.frequent_cats_: Dict[str, set] = {} def fit(self, X: pd.DataFrame, y=None): n = len(X) for c in self.columns: s = X[c].astype("object") vc = s.value_counts(dropna=False) if self.min_count is not None: threshold_count = self.min_count else: threshold_count = max(1, int(np.floor(self.min_freq * n))) keep = set(vc[vc >= threshold_count].index.tolist()) # 保证保留缺失占位 keep.add("__missing__") self.frequent_cats_[c] = keep return self def transform(self, X: pd.DataFrame): X_out = X.copy() for c in self.columns: s = X_out[c].astype("object") s = s.where(s.isin(self.frequent_cats_[c]), "__rare__") X_out[c] = s return X_out[self.columns] class CategoricalTextNormalizer(BaseEstimator, TransformerMixin): """分类型列文本统一:NFKC、strip、空白压缩、小写;缺失值统一为'__missing__'。""" def __init__(self, columns: List[str]): self.columns = columns def fit(self, X: pd.DataFrame, y=None): return self def transform(self, X: pd.DataFrame): X_out = X.copy() for c in self.columns: s = X_out[c].astype("object") s = s.map(lambda v: "__missing__" if pd.isna(v) else normalize_text(v)) # 空字符串也视为缺失占位 s = s.replace("", "__missing__") X_out[c] = s return X_out[self.columns] class DateFeatureExtractor(BaseEstimator, TransformerMixin): """日期列派生统一数值特征并标准化缺失:中位数填充;输出派生特征(不保留原日期列)。 提取:year, month, day, dayofweek, dayofyear, week, is_month_start, is_month_end, is_quarter_start, is_quarter_end. """ def __init__(self, columns: List[str]): self.columns = columns self.medians_: Dict[str, pd.Timestamp] = {} self.feature_names_: List[str] = [] def fit(self, X: pd.DataFrame, y=None): for c in self.columns: dt = pd.to_datetime(X[c], errors="coerce", infer_datetime_format=True) med = dt.dropna().median() # 若全NaT,回退到Unix epoch if pd.isna(med): med = pd.Timestamp("1970-01-01") self.medians_[c] = med # 预先计算输出列名 feats = ["year", "month", "day", "dayofweek", "dayofyear", "week", "is_month_start", "is_month_end", "is_quarter_start", "is_quarter_end"] self.feature_names_ = [f"{c}__{f}" for c in self.columns for f in feats] return self def transform(self, X: pd.DataFrame): out = {} for c in self.columns: dt = pd.to_datetime(X[c], errors="coerce", infer_datetime_format=True) dt = dt.fillna(self.medians_[c]) # isocalendar().week 返回UInt32,需要转为int iso = dt.dt.isocalendar() out[f"{c}__year"] = dt.dt.year.astype(np.int32) out[f"{c}__month"] = dt.dt.month.astype(np.int32) out[f"{c}__day"] = dt.dt.day.astype(np.int32) out[f"{c}__dayofweek"] = dt.dt.dayofweek.astype(np.int32) out[f"{c}__dayofyear"] = dt.dt.dayofyear.astype(np.int32) out[f"{c}__week"] = iso.week.astype("int32") out[f"{c}__is_month_start"] = dt.dt.is_month_start.astype(np.int8) out[f"{c}__is_month_end"] = dt.dt.is_month_end.astype(np.int8) out[f"{c}__is_quarter_start"] = dt.dt.is_quarter_start.astype(np.int8) out[f"{c}__is_quarter_end"] = dt.dt.is_quarter_end.astype(np.int8) return pd.DataFrame(out, index=X.index) # ===================== 主流程 ===================== def build_preprocessor(df: pd.DataFrame) -> Tuple[Pipeline, List[str]]: """构建整体预处理器(ColumnTransformer + Pipelines),返回预处理器与输出特征名。""" numeric_cols, date_cols, categorical_cols = infer_column_types(df) # 数值管线:裁剪+填充+标准化 numeric_pipe = Pipeline(steps=[ ("clip_impute", NumericOutlierClipperImputer(columns=numeric_cols, iqr_k=1.5)), ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) # 日期管线:派生特征 + 标准化 date_pipe = Pipeline(steps=[ ("date_features", DateFeatureExtractor(columns=date_cols)), ("scaler", StandardScaler(with_mean=True, with_std=True)) ]) # 分类型管线:文本统一 + 低频折叠 + One-Hot categorical_pipe = Pipeline(steps=[ ("text_norm", CategoricalTextNormalizer(columns=categorical_cols)), ("rare", RareCategoryGrouper(columns=categorical_cols, min_freq=0.01, min_count=None)), ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=False)) ]) # 组合各列 transformers = [] if numeric_cols: transformers.append(("num", numeric_pipe, numeric_cols)) if date_cols: transformers.append(("date", date_pipe, date_cols)) if categorical_cols: transformers.append(("cat", categorical_pipe, categorical_cols)) preprocessor = ColumnTransformer(transformers=transformers, remainder="drop", sparse_threshold=0.0) pipeline = Pipeline(steps=[("preprocessor", preprocessor)]) # 输出特征名:需在拟合后才能确定(尤其是OHE) return pipeline, (numeric_cols, date_cols, categorical_cols) def feature_names_after_fit(pipeline: Pipeline, numeric_cols: List[str], date_cols: List[str], categorical_cols: List[str]) -> List[str]: """从已拟合的ColumnTransformer中提取输出特征名。兼容不同sklearn版本。""" ct: ColumnTransformer = pipeline.named_steps["preprocessor"] names = [] for name, transformer, cols in ct.transformers_: if name == "num": # StandardScaler不改变列名 names.extend(cols) elif name == "date": # DateFeatureExtractor生成的派生列名 df_extractor: DateFeatureExtractor = transformer.named_steps["date_features"] names.extend(df_extractor.feature_names_) elif name == "cat": ohe: OneHotEncoder = transformer.named_steps["ohe"] try: # 新版支持 get_feature_names_out ohe_names = ohe.get_feature_names_out(categorical_cols).tolist() except Exception: # 回退:用类别组合生成 ohe_names = [] for i, c in enumerate(categorical_cols): cats = ohe.categories_[i] ohe_names.extend([f"{c}__{cat}" for cat in cats]) names.extend(ohe_names) return names def main(): parser = argparse.ArgumentParser(description="数据集标准化与统一编码预处理脚本") parser.add_argument("--input", required=True, help="输入CSV文件路径") parser.add_argument("--output", required=True, help="输出文件路径(建议.parquet)") parser.add_argument("--sep", default=",", help="CSV分隔符,默认','") parser.add_argument("--encoding", default="utf-8", help="输入文件编码,默认utf-8") parser.add_argument("--save-pipeline", default="preprocessor.joblib", help="保存预处理器的路径") parser.add_argument("--nrows", type=int, default=None, help="可选:仅读取前n行用于快速试跑") args = parser.parse_args() # 读取数据 df = pd.read_csv(args.input, sep=args.sep, encoding=args.encoding, low_memory=False, nrows=args.nrows) # 构建并拟合预处理器 pipeline, (numeric_cols, date_cols, categorical_cols) = build_preprocessor(df) pipeline.fit(df) # 变换数据 X = pipeline.transform(df) # 输出特征名并保存为DataFrame feat_names = feature_names_after_fit(pipeline, numeric_cols, date_cols, categorical_cols) # 若长度不匹配,回退生成通用列名 if X.shape[1] != len(feat_names): feat_names = [f"f_{i}" for i in range(X.shape[1])] out_df = pd.DataFrame(X, columns=feat_names, index=df.index) # 保存为Parquet(统一UTF-8,压缩) out_path = args.output ext = os.path.splitext(out_path)[1].lower() if ext in [".parquet", ".pq"]: try: out_df.to_parquet(out_path, engine="pyarrow", index=False) except Exception: # 回退到fastparquet out_df.to_parquet(out_path, engine="fastparquet", index=False) else: # CSV输出,确保UTF-8 out_df.to_csv(out_path, index=False, encoding="utf-8") # 保存预处理器以保证训练-推理一致 dump(pipeline, args.save_pipeline) # 简要信息 print("预处理完成:") print(f"- 数值列:{len(numeric_cols)}") print(f"- 日期列:{len(date_cols)}") print(f"- 分类型列:{len(categorical_cols)}") print(f"- 输出特征数:{out_df.shape[1]}") print(f"- 已保存数据:{out_path}") print(f"- 已保存预处理器:{args.save_pipeline}") if __name__ == "__main__": main() ``` 说明与建议: - IQR裁剪结合StandardScaler适合存在异常值的数值分布;若异常值极端且比例较高,可将StandardScaler替换为RobustScaler以增强稳健性。 - 低频类别折叠阈值min_freq=1%旨在控制One-Hot维度;可根据实际唯一值规模与内存情况调整为更高或更低,或使用min_count做绝对控制。 - 日期特征派生为纯数值再标准化,避免保留原始datetime;如业务需要更多时间特征(例如与某基准日期的差值、节假日标识等),可在DateFeatureExtractor中扩展。 - 该脚本不依赖目标变量;在建模流程中应仅在训练集上拟合pipeline,再对验证/测试集调用transform,确保无数据泄露。
以下为一个可执行的标准化脚本示例,面向“交易表 + 用户表”的常见问题:主键不唯一、分类型编码不一致、时区需统一。脚本具备如下能力: - 统一列名为 snake_case - 基于配置或自动推断的时间列统一到目标时区 - 可配置的主键去重(默认按时间优先级保留“最新”记录) - 分类型字段统一清洗与一致编码,跨表使用同一编码词表 - 输出清洗后的表、编码映射、数据字典和去重报告 请根据实际字段名通过命令行参数或在配置中指定主键、时区和时间优先级列。 代码如下: ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 标准化流水线脚本:交易表与用户表 功能: - 统一列名 - 主键去重(支持按时间优先级保留最新记录) - 时区统一 - 分类型字段一致化与编码 - 输出清洗结果与元数据工件 依赖: - Python 3.8+ - pandas, numpy 使用示例: python standardize_pipeline.py \ --transactions-path ./data/transactions.csv \ --users-path ./data/users.parquet \ --output-dir ./out \ --transactions-pk transaction_id \ --users-pk user_id \ --target-tz UTC \ --naive-source-tz Asia/Shanghai \ --transactions-time-priority event_time,updated_at \ --users-time-priority updated_at,created_at \ --infer-categorical """ import argparse import json import os import re import sys import unicodedata from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd # 兼容 zoneinfo(优先)与 pytz try: from zoneinfo import ZoneInfo # py39+ _tz = ZoneInfo except Exception: try: import pytz def _tz(name: str): return pytz.timezone(name) except Exception: _tz = None # ============ 工具函数 ============ def standardize_colname(name: str) -> str: """列名标准化为 snake_case,移除前后空格并规范字符宽度。""" if name is None: return name s = unicodedata.normalize("NFKC", str(name)).strip() s = s.replace("-", "_").replace("/", "_").replace(".", "_") s = re.sub(r"[^\w]+", "_", s) s = re.sub(r"_+", "_", s) s = s.strip("_") return s.lower() def standardize_columns(df: pd.DataFrame) -> pd.DataFrame: df = df.copy() df.columns = [standardize_colname(c) for c in df.columns] return df def read_any(path: str) -> pd.DataFrame: """按扩展名读取 CSV/Parquet/Feather。""" ext = os.path.splitext(path)[1].lower() if ext in [".csv", ".txt"]: return pd.read_csv(path) if ext in [".parquet", ".pq"]: return pd.read_parquet(path) if ext in [".feather", ".ft"]: return pd.read_feather(path) raise ValueError(f"不支持的文件扩展名: {ext}") def write_any(df: pd.DataFrame, path: str): """按扩展名写出到 CSV 或 Parquet。""" ext = os.path.splitext(path)[1].lower() os.makedirs(os.path.dirname(path), exist_ok=True) if ext in [".csv", ".txt"]: df.to_csv(path, index=False) elif ext in [".parquet", ".pq"]: df.to_parquet(path, index=False) else: raise ValueError(f"不支持的输出扩展名: {ext}") def detect_datetime_columns(df: pd.DataFrame, sample_size: int = 1000, success_rate: float = 0.6) -> List[str]: """基于采样的鲁棒时间列识别。""" cols = [] n = min(len(df), sample_size) sample_df = df.sample(n) if n > 0 else df for c in df.columns: if df[c].dtype.kind in "Mm": # 已是 datetime64 dtype cols.append(c) continue if df[c].dtype == object or np.issubdtype(df[c].dtype, np.number): try: parsed = pd.to_datetime(sample_df[c], errors="coerce", infer_datetime_format=True) rate = parsed.notna().mean() if len(parsed) > 0 else 0.0 if rate >= success_rate: cols.append(c) except Exception: pass return cols def parse_any_timestamp(val, naive_source_tz: Optional[str], target_tz: str): """单值层面的时间解析与时区统一。对混合格式具有鲁棒性。""" if pd.isna(val): return pd.NaT try: # 数字视为 epoch 秒/毫秒 if isinstance(val, (int, np.integer, float, np.floating)) and not isinstance(val, bool): v = float(val) if np.isnan(v): return pd.NaT # 简单判别位数 if abs(v) >= 1e12: # 毫秒 ts = pd.to_datetime(int(v), unit="ms", utc=False) else: # 秒 ts = pd.to_datetime(int(v), unit="s", utc=False) else: ts = pd.to_datetime(val, errors="coerce", utc=False, infer_datetime_format=True) if ts is pd.NaT: return pd.NaT # 时区处理 if getattr(ts, "tzinfo", None) is None: if naive_source_tz: if _tz is None: raise RuntimeError("缺少时区库(zoneinfo/pytz),无法本地化 naive 时间") ts = ts.tz_localize(_tz(naive_source_tz), ambiguous="NaT", nonexistent="NaT") else: # 默认将 naive 视作 UTC,以避免错误偏移(可通过参数覆盖) if _tz is None: return ts # 退化为 naive ts = ts.tz_localize(_tz("UTC")) if _tz is None: return ts # 无法进一步转换 return ts.tz_convert(_tz(target_tz)) except Exception: return pd.NaT def unify_timezones( df: pd.DataFrame, tz_target: str, tz_source_naive: Optional[str], datetime_cols: Optional[List[str]] = None ) -> pd.DataFrame: """将指定时间列统一到目标时区;未指定则自动识别。""" if tz_target is None: return df if datetime_cols is None: datetime_cols = detect_datetime_columns(df) if not datetime_cols: return df out = df.copy() for c in datetime_cols: out[c] = out[c].apply(lambda x: parse_any_timestamp(x, tz_source_naive, tz_target)) return out def last_non_null_with_priority(df: pd.DataFrame, sort_cols: List[str]) -> pd.Series: """ 按时间优先级排序后,返回每列最后一个非空值。 假设 df 已按 sort_cols 排序(升序/降序在上层控制)。 """ return df.ffill().bfill().iloc[-1] def deduplicate_by_pk( df: pd.DataFrame, pk: str, time_priority: Optional[List[str]] = None, ascending: bool = True ) -> Tuple[pd.DataFrame, pd.DataFrame]: """ 主键去重,默认按时间优先级列排序,保留“最新”记录。 返回:去重后的数据框、被移除的重复记录明细。 """ if pk not in df.columns: raise ValueError(f"主键列 {pk} 不在数据集中") dup_mask = df[pk].duplicated(keep=False) dup_df = df.loc[dup_mask].copy() if time_priority: # 仅使用存在的排序列 sort_cols = [c for c in time_priority if c in df.columns] if sort_cols: # 缺失时间列排序时,将缺失值置于最前/最后的行为由 na_position 控制 df_sorted = df.sort_values(by=sort_cols, ascending=ascending, na_position="first") # drop_duplicates 保留最后一条(即“最新”) deduped = df_sorted.drop_duplicates(subset=[pk], keep="last") else: deduped = df.drop_duplicates(subset=[pk], keep="last") else: deduped = df.drop_duplicates(subset=[pk], keep="last") removed = pd.concat([df, deduped]).drop_duplicates(keep=False) return deduped.reset_index(drop=True), removed.reset_index(drop=True) def is_probably_boolean_col(name: str, series: pd.Series) -> bool: """基于列名与取值判断是否布尔/标志类列。""" name_hit = bool(re.search(r"(is_|has_|flag|active|valid|enabled|deleted|available|status)$", name)) # 低基数时进一步判断 if series.dtype == object or pd.api.types.is_categorical_dtype(series): vals = pd.Series(series.dropna().astype(str).str.lower().str.strip().unique()) val_hit = vals.isin(["y", "n", "yes", "no", "true", "false", "0", "1", "是", "否"]).mean() >= 0.6 else: val_hit = False return name_hit or val_hit def canonicalize_category_value(x: object) -> object: """通用分类型值清洗(大小写、空白、全半角、统一空值)。""" if pd.isna(x): return np.nan s = str(x) s = unicodedata.normalize("NFKC", s) s = s.strip().lower() s = re.sub(r"\s+", " ", s) if s in ["", "nan", "none", "null", "na"]: return np.nan return s def normalize_boolean_value(x: object) -> object: """布尔值统一为 'yes'/'no'(保持字符串表示,便于后续一致编码)。""" if pd.isna(x): return np.nan s = str(x).strip().lower() mapping = { "y": "yes", "yes": "yes", "true": "yes", "1": "yes", "t": "yes", "是": "yes", "n": "no", "no": "no", "false": "no", "0": "no", "f": "no", "否": "no" } return mapping.get(s, canonicalize_category_value(s)) def infer_categorical_columns(df: pd.DataFrame, max_unique: int = 1000, unique_ratio: float = 0.5) -> List[str]: """自动识别分类型列:低基数或唯一比率较低的 object/category 列。""" cats = [] for c in df.columns: if df[c].dtype == object or pd.api.types.is_categorical_dtype(df[c]): nunique = df[c].nunique(dropna=True) if nunique <= max_unique and (nunique / max(1, len(df))) <= unique_ratio: cats.append(c) return cats def build_global_category_vocab( dfs: List[pd.DataFrame], cat_cols: List[str], boolean_cols: Optional[List[str]] = None ) -> Dict[str, Dict[str, int]]: """ 构建全局一致编码词表:对指定列跨表收集所有取值,排序后映射为稳定整数编码。 - boolean 列先做 'yes'/'no' 归一 - 其余列做通用清洗 """ vocab = {} boolean_cols = set(boolean_cols or []) for c in cat_cols: vals = [] for df in dfs: if c in df.columns: series = df[c] if c in boolean_cols: series = series.map(normalize_boolean_value) else: series = series.map(canonicalize_category_value) vals.append(series.dropna().unique()) if not vals: continue all_vals = pd.unique(pd.Series(np.concatenate(vals, axis=0))) # 排序稳定(先长度后字典序可以减少拼写接近值的编码跳变) all_vals_sorted = sorted(all_vals, key=lambda s: (len(str(s)), str(s))) vocab[c] = {str(v): i for i, v in enumerate(all_vals_sorted, start=1)} # 从1开始编码,0保留为空 return vocab def apply_categorical_encoding( df: pd.DataFrame, vocab: Dict[str, Dict[str, int]], boolean_cols: Optional[List[str]] = None, drop_original: bool = False, suffix: str = "_enc" ) -> pd.DataFrame: """应用一致编码,生成 *_enc 整数列;可选择是否保留原列。""" out = df.copy() boolean_cols = set(boolean_cols or []) for c, mapping in vocab.items(): if c not in out.columns: continue cleaned = out[c].map(normalize_boolean_value if c in boolean_cols else canonicalize_category_value) enc = cleaned.map(lambda v: (0 if pd.isna(v) else mapping.get(str(v), 0))).astype("Int32") new_col = f"{c}{suffix}" out[new_col] = enc if drop_original: out.drop(columns=[c], inplace=True) return out def save_metadata( output_dir: str, artifacts: Dict[str, object], fname: str = "metadata.json" ): os.makedirs(output_dir, exist_ok=True) path = os.path.join(output_dir, fname) with open(path, "w", encoding="utf-8") as f: json.dump(artifacts, f, ensure_ascii=False, indent=2) # ============ 主流程 ============ def run_pipeline( transactions_path: str, users_path: str, output_dir: str, transactions_pk: str, users_pk: str, target_tz: Optional[str] = "UTC", naive_source_tz: Optional[str] = None, transactions_time_priority: Optional[List[str]] = None, users_time_priority: Optional[List[str]] = None, infer_cats: bool = True, explicit_cat_cols: Optional[List[str]] = None, drop_original_cats: bool = False ): # 1) 读取 tx = read_any(transactions_path) us = read_any(users_path) # 2) 列名规范 tx = standardize_columns(tx) us = standardize_columns(us) transactions_pk = standardize_colname(transactions_pk) users_pk = standardize_colname(users_pk) transactions_time_priority = [standardize_colname(c) for c in (transactions_time_priority or [])] users_time_priority = [standardize_colname(c) for c in (users_time_priority or [])] if explicit_cat_cols: explicit_cat_cols = [standardize_colname(c) for c in explicit_cat_cols] # 3) 时区统一(自动识别时间列,统一到 target_tz;naive 使用 naive_source_tz 本地化) tx = unify_timezones(tx, tz_target=target_tz, tz_source_naive=naive_source_tz) us = unify_timezones(us, tz_target=target_tz, tz_source_naive=naive_source_tz) # 4) 主键去重(默认保留最新) tx_dedup, tx_removed = deduplicate_by_pk( tx, pk=transactions_pk, time_priority=transactions_time_priority, ascending=True ) us_dedup, us_removed = deduplicate_by_pk( us, pk=users_pk, time_priority=users_time_priority, ascending=True ) # 5) 分类型列识别 if explicit_cat_cols: cat_cols = explicit_cat_cols else: # 合并两个表的候选分类型列,取交集优先保障跨表一致性;若交集为空,取并集 tx_cats = infer_categorical_columns(tx_dedup) if infer_cats else [] us_cats = infer_categorical_columns(us_dedup) if infer_cats else [] inter = sorted(list(set(tx_cats).intersection(us_cats))) union = sorted(list(set(tx_cats).union(us_cats))) cat_cols = inter if inter else union # 根据列名/取值推断布尔列 boolean_cols = [] for c in cat_cols: s_tx = tx_dedup[c] if c in tx_dedup.columns else pd.Series([], dtype=object) s_us = us_dedup[c] if c in us_dedup.columns else pd.Series([], dtype=object) s = pd.concat([s_tx, s_us], ignore_index=True) if is_probably_boolean_col(c, s): boolean_cols.append(c) # 6) 构建全局一致编码词表并应用 vocab = build_global_category_vocab([tx_dedup, us_dedup], cat_cols=cat_cols, boolean_cols=boolean_cols) tx_std = apply_categorical_encoding( tx_dedup, vocab=vocab, boolean_cols=boolean_cols, drop_original=drop_original_cats ) us_std = apply_categorical_encoding( us_dedup, vocab=vocab, boolean_cols=boolean_cols, drop_original=drop_original_cats ) # 7) 输出 os.makedirs(output_dir, exist_ok=True) # 数据 write_any(tx_std, os.path.join(output_dir, "transactions_standardized.parquet")) write_any(us_std, os.path.join(output_dir, "users_standardized.parquet")) # 去重报告 tx_removed.to_csv(os.path.join(output_dir, "transactions_removed_duplicates.csv"), index=False) us_removed.to_csv(os.path.join(output_dir, "users_removed_duplicates.csv"), index=False) # 词表与元数据 artifacts = { "target_timezone": target_tz, "naive_source_timezone": naive_source_tz, "transactions": { "path": transactions_path, "primary_key": transactions_pk, "time_priority": transactions_time_priority, "rows_in": int(len(tx)), "rows_out": int(len(tx_std)), "duplicates_removed": int(len(tx_removed)), }, "users": { "path": users_path, "primary_key": users_pk, "time_priority": users_time_priority, "rows_in": int(len(us)), "rows_out": int(len(us_std)), "duplicates_removed": int(len(us_removed)), }, "categorical_columns": cat_cols, "boolean_like_columns": boolean_cols, "categorical_vocab": vocab, "dtype_after_transactions": {c: str(tx_std[c].dtype) for c in tx_std.columns}, "dtype_after_users": {c: str(us_std[c].dtype) for c in us_std.columns}, } save_metadata(output_dir, artifacts, fname="metadata.json") print("标准化完成:") print(f"- 交易表输入/输出行数:{len(tx)} -> {len(tx_std)};去重条数:{len(tx_removed)}") print(f"- 用户表输入/输出行数:{len(us)} -> {len(us_std)};去重条数:{len(us_removed)}") print(f"- 结果目录:{output_dir}") def parse_args(): p = argparse.ArgumentParser(description="交易与用户表标准化脚本") p.add_argument("--transactions-path", required=True, help="交易表路径(csv/parquet/feather)") p.add_argument("--users-path", required=True, help="用户表路径(csv/parquet/feather)") p.add_argument("--output-dir", required=True, help="输出目录") p.add_argument("--transactions-pk", required=True, help="交易表主键列名") p.add_argument("--users-pk", required=True, help="用户表主键列名") p.add_argument("--target-tz", default="UTC", help="目标时区(如 UTC 或 Asia/Shanghai)") p.add_argument("--naive-source-tz", default=None, help="当时间值缺少时区信息时的默认来源时区") p.add_argument("--transactions-time-priority", default=None, help="交易表时间优先级列,逗号分隔(如 event_time,updated_at)") p.add_argument("--users-time-priority", default=None, help="用户表时间优先级列,逗号分隔(如 updated_at,created_at)") p.add_argument("--infer-categorical", action="store_true", help="自动识别分类型列(否则仅按 --categorical-cols 使用)") p.add_argument("--categorical-cols", default=None, help="显式指定需要一致编码的列名,逗号分隔(若提供则覆盖自动识别)") p.add_argument("--drop-original-cats", action="store_true", help="是否在编码后删除原始分类型列") args = p.parse_args() def split_cols(v: Optional[str]) -> Optional[List[str]]: if v is None or str(v).strip() == "": return None return [s.strip() for s in v.split(",") if s.strip()] return { "transactions_path": args.transactions_path, "users_path": args.users_path, "output_dir": args.output_dir, "transactions_pk": args.transactions_pk, "users_pk": args.users_pk, "target_tz": args.target_tz, "naive_source_tz": args.naive_source_tz, "transactions_time_priority": split_cols(args.transactions_time_priority), "users_time_priority": split_cols(args.users_time_priority), "infer_cats": bool(args.infer_categorical), "explicit_cat_cols": split_cols(args.categorical_cols), "drop_original_cats": bool(args.drop_original_cats), } if __name__ == "__main__": params = parse_args() try: run_pipeline(**params) except Exception as e: print(f"错误:{e}", file=sys.stderr) sys.exit(1) ``` 说明与建议: - 主键不唯一:脚本默认按时间优先级列排序后保留“最新”记录;请通过 --transactions-time-priority 与 --users-time-priority 指定时间列优先级,未指定时按出现顺序且仅基于行序保留最后一条。 - 时区统一:对含混合格式/是否含时区的时间列逐值解析;对 naive 时间请通过 --naive-source-tz 指定来源时区,避免被默认当作 UTC。目标时区通过 --target-tz 指定。 - 分类型编码不一致:可用 --infer-categorical 自动识别分类列,或通过 --categorical-cols 显式指定。脚本跨表构建统一词表并输出到 metadata.json 的 categorical_vocab。 - 性能注意:逐值时间解析在极大数据上可能偏慢;如有明确时间列且格式一致,建议先将这些列以向量化方式转为 datetime,再调用本脚本的去重与编码流程或调整 unify_timezones 实现为向量化处理。
Below is a self-contained, reproducible Python script that standardizes datasets containing continuous variables, categorical variables, and timestamps. It ensures unified categorical encoding across small-sample, multi-batch scenarios, and saves the fitted preprocessing artifacts for consistent application to future batches. Key properties: - Deterministic, reproducible preprocessing and encoding. - Unified categorical encoding with explicit handling for missing and unseen categories. - Timestamp normalization to UTC epoch seconds and inclusion in numeric standardization. - Artifacts (pipeline and metadata) are saved and reused across batches. Script: ```python #!/usr/bin/env python3 """ Dataset Standardization Script: continuous + categorical + timestamp - Fit mode: builds and saves preprocessing artifacts from training batches. - Transform mode: applies saved artifacts to new batches, producing standardized output. Requirements: - Python 3.8+ - pandas >= 1.3 - numpy >= 1.20 - scikit-learn >= 1.1 - joblib Example usage: Fit on training directory: python standardize.py fit \ --input_dir ./train_batches \ --schema ./schema.json \ --output_dir ./artifacts Transform a new batch: python standardize.py transform \ --input_file ./batch_new.csv \ --artifacts_dir ./artifacts \ --output_file ./batch_new_standardized.csv Schema (JSON) example: { "continuous": ["num_col1", "num_col2"], "categorical": ["cat_a", "cat_b"], "timestamp": ["event_time"], "timestamp_parsing": { "format": null, // optional strftime format; null = auto "timezone": "UTC" // timezone of input if naive; converted to UTC }, "imputation": { "numeric_strategy": "median", "categorical_missing_token": "__MISSING__", "unseen_token": "__UNSEEN__" } } """ import argparse import json import os from pathlib import Path from typing import Dict, List, Optional, Tuple import joblib import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.impute import SimpleImputer # ----------------------- # Utilities # ----------------------- def _load_schema(schema_path: str) -> Dict: with open(schema_path, "r", encoding="utf-8") as f: schema = json.load(f) # Basic validation for key in ["continuous", "categorical", "timestamp"]: if key not in schema or not isinstance(schema[key], list): schema[key] = [] # Defaults ts_parsing = schema.get("timestamp_parsing", {}) schema["timestamp_parsing"] = { "format": ts_parsing.get("format", None), "timezone": ts_parsing.get("timezone", "UTC"), } imp = schema.get("imputation", {}) schema["imputation"] = { "numeric_strategy": imp.get("numeric_strategy", "median"), "categorical_missing_token": imp.get("categorical_missing_token", "__MISSING__"), "unseen_token": imp.get("unseen_token", "__UNSEEN__"), } return schema def _read_csvs_from_dir(input_dir: str) -> List[pd.DataFrame]: files = sorted([str(p) for p in Path(input_dir).glob("*.csv")]) if not files: raise FileNotFoundError(f"No CSV files found in directory: {input_dir}") dfs = [] for fp in files: df = pd.read_csv(fp) dfs.append(df) return dfs def _read_csv(input_file: str) -> pd.DataFrame: return pd.read_csv(input_file) def _parse_timestamp_series( s: pd.Series, fmt: Optional[str], tz: Optional[str], ) -> pd.Series: # Robust parsing # If fmt given, use it; else let pandas infer if fmt: dt = pd.to_datetime(s, format=fmt, errors="coerce") else: dt = pd.to_datetime(s, errors="coerce") # Handle timezone: if naive, localize to tz; then convert to UTC tz_in = tz or "UTC" if dt.dt.tz is None: # localize naive timestamps to tz_in try: dt = dt.dt.tz_localize(tz_in) except Exception: # If localization fails (e.g., invalid tz), fallback to UTC dt = dt.dt.tz_localize("UTC") # Convert to UTC dt = dt.dt.tz_convert("UTC") # Convert to epoch seconds (float) # astype('int64') -> nanoseconds since epoch epoch_seconds = dt.view("int64") / 1e9 return epoch_seconds def _preprocess_timestamps( df: pd.DataFrame, timestamp_cols: List[str], ts_format: Optional[str], tz: Optional[str], ) -> Tuple[pd.DataFrame, List[str]]: """ Converts timestamp columns to numeric epoch seconds in UTC. Returns: - df with additional numeric columns "<col>__epoch_seconds" - list of new timestamp feature names """ ts_numeric_cols = [] for col in timestamp_cols: if col not in df.columns: # missing timestamp column; create NaNs df[col] = np.nan epoch = _parse_timestamp_series(df[col], ts_format, tz) new_col = f"{col}__epoch_seconds" df[new_col] = epoch.astype(float) ts_numeric_cols.append(new_col) # Drop original timestamp column to avoid accidental reuse df.drop(columns=[col], inplace=True) return df, ts_numeric_cols def _collect_categorical_sets( dfs: List[pd.DataFrame], categorical_cols: List[str], missing_token: str, unseen_token: str, ) -> Dict[str, List[str]]: """ Collects sorted unique categorical values across all training batches. Adds missing_token and unseen_token explicitly to categories for stable encoding. """ cat_values = {c: set() for c in categorical_cols} for df in dfs: for c in categorical_cols: if c not in df.columns: continue # Convert to string to enforce consistent dtype vals = df[c].astype("string") # Exclude NA here; missing handled via imputation, but still add missing_token explicitly uniq = set([str(v) for v in vals.dropna().unique().tolist()]) cat_values[c].update(uniq) categories = {} for c in categorical_cols: base = sorted(cat_values[c]) if cat_values[c] else [] # Ensure deterministic presence and order of special tokens if missing_token not in base: base.append(missing_token) if unseen_token not in base: base.append(unseen_token) categories[c] = base return categories def _map_unseen_categories( df: pd.DataFrame, categorical_cols: List[str], known_categories: Dict[str, List[str]], missing_token: str, unseen_token: str, ) -> pd.DataFrame: """ Maps categorical values not in known_categories to unseen_token. Explicitly fills missing with missing_token. """ for c in categorical_cols: if c not in df.columns: # create column filled as missing_token df[c] = missing_token # Ensure string dtype s = df[c].astype("string") known_set = set(known_categories[c]) s = s.fillna(missing_token) df[c] = s.where(s.isin(known_set), other=unseen_token).astype("string") return df def _build_preprocessor( numeric_cols: List[str], categorical_cols: List[str], categories_map: Dict[str, List[str]], numeric_impute_strategy: str, missing_token: str, ) -> ColumnTransformer: """ Builds the ColumnTransformer with: - Numeric: SimpleImputer + StandardScaler - Categorical: SimpleImputer (constant missing_token) + OneHotEncoder with fixed categories """ numeric_pipeline = Pipeline( steps=[ ("imputer", SimpleImputer(strategy=numeric_impute_strategy)), ("scaler", StandardScaler(with_mean=True, with_std=True)), ] ) # OneHot with fixed categories in deterministic order cat_categories = [categories_map[c] for c in categorical_cols] categorical_pipeline = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value=missing_token)), ("onehot", OneHotEncoder(categories=cat_categories, handle_unknown="ignore", sparse=False)), ] ) preprocessor = ColumnTransformer( transformers=[ ("num", numeric_pipeline, numeric_cols), ("cat", categorical_pipeline, categorical_cols), ], remainder="drop", verbose_feature_names_out=True, ) return preprocessor def _fit( input_dir: str, schema_path: str, output_dir: str, ) -> None: os.makedirs(output_dir, exist_ok=True) schema = _load_schema(schema_path) continuous = schema["continuous"] categorical = schema["categorical"] timestamp = schema["timestamp"] ts_fmt = schema["timestamp_parsing"]["format"] tz = schema["timestamp_parsing"]["timezone"] numeric_strategy = schema["imputation"]["numeric_strategy"] missing_token = schema["imputation"]["categorical_missing_token"] unseen_token = schema["imputation"]["unseen_token"] dfs = _read_csvs_from_dir(input_dir) # Collect categories across all batches categories_map = _collect_categorical_sets(dfs, categorical, missing_token, unseen_token) # Concatenate for fitting scaler statistics # For small samples, we can load all to memory df_all = pd.concat(dfs, axis=0, ignore_index=True) # Timestamp preprocessing -> numeric epoch seconds df_all, ts_numeric_cols = _preprocess_timestamps(df_all, timestamp, ts_fmt, tz) # Enforce dtypes and map unknowns for categorical columns using collected categories df_all = _map_unseen_categories(df_all, categorical, categories_map, missing_token, unseen_token) # Numeric columns include continuous + engineered timestamp numeric cols numeric_cols = list(continuous) + list(ts_numeric_cols) # Build and fit preprocessor preprocessor = _build_preprocessor( numeric_cols=numeric_cols, categorical_cols=categorical, categories_map=categories_map, numeric_impute_strategy=numeric_strategy, missing_token=missing_token, ) # Fit preprocessor.fit(df_all) # Persist artifacts joblib.dump(preprocessor, os.path.join(output_dir, "preprocessor.joblib")) # Derive output feature names for reproducibility try: feature_names = preprocessor.get_feature_names_out().tolist() except Exception: feature_names = None metadata = { "schema": { "continuous": continuous, "categorical": categorical, "timestamp": timestamp, "timestamp_parsing": {"format": ts_fmt, "timezone": tz}, "imputation": { "numeric_strategy": numeric_strategy, "categorical_missing_token": missing_token, "unseen_token": unseen_token, }, }, "categories_map": categories_map, # fixed categories per categorical column "numeric_cols": numeric_cols, # includes timestamp-derived numeric columns "feature_names_out": feature_names, # final standardized feature names (if available) "version": "1.0", } with open(os.path.join(output_dir, "metadata.json"), "w", encoding="utf-8") as f: json.dump(metadata, f, ensure_ascii=False, indent=2) print(f"Fit complete. Artifacts saved to: {output_dir}") def _transform( input_file: str, artifacts_dir: str, output_file: str, ) -> None: # Load artifacts preprocessor = joblib.load(os.path.join(artifacts_dir, "preprocessor.joblib")) with open(os.path.join(artifacts_dir, "metadata.json"), "r", encoding="utf-8") as f: metadata = json.load(f) schema = metadata["schema"] categories_map = metadata["categories_map"] continuous = schema["continuous"] categorical = schema["categorical"] timestamp = schema["timestamp"] ts_fmt = schema["timestamp_parsing"]["format"] tz = schema["timestamp_parsing"]["timezone"] missing_token = schema["imputation"]["categorical_missing_token"] unseen_token = schema["imputation"]["unseen_token"] # Read input batch df = _read_csv(input_file) # Timestamp preprocessing df, ts_numeric_cols = _preprocess_timestamps(df, timestamp, ts_fmt, tz) numeric_cols = list(continuous) + list(ts_numeric_cols) # Map unseen and missing categories deterministically df = _map_unseen_categories(df, categorical, categories_map, missing_token, unseen_token) # Align missing numeric columns if absent for c in numeric_cols: if c not in df.columns: df[c] = np.nan # Align missing categorical columns if absent for c in categorical: if c not in df.columns: df[c] = missing_token # Transform X = preprocessor.transform(df) # Feature names feature_names = metadata.get("feature_names_out", None) if feature_names is None: # Fallback: generate placeholder names feature_names = [f"f{i}" for i in range(X.shape[1])] # Save standardized output out_df = pd.DataFrame(X, columns=feature_names) out_df.to_csv(output_file, index=False) print(f"Transform complete. Output saved to: {output_file}") def main(): parser = argparse.ArgumentParser(description="Standardize dataset across batches (continuous + categorical + timestamp).") subparsers = parser.add_subparsers(dest="mode", required=True) p_fit = subparsers.add_parser("fit", help="Fit preprocessing on training batches.") p_fit.add_argument("--input_dir", type=str, required=True, help="Directory with training CSV batches.") p_fit.add_argument("--schema", type=str, required=True, help="Path to JSON schema.") p_fit.add_argument("--output_dir", type=str, required=True, help="Directory to save artifacts.") p_transform = subparsers.add_parser("transform", help="Apply saved preprocessing to a new batch.") p_transform.add_argument("--input_file", type=str, required=True, help="CSV file to transform.") p_transform.add_argument("--artifacts_dir", type=str, required=True, help="Directory with saved artifacts.") p_transform.add_argument("--output_file", type=str, required=True, help="Where to write standardized CSV.") args = parser.parse_args() if args.mode == "fit": _fit(args.input_dir, args.schema, args.output_dir) elif args.mode == "transform": _transform(args.input_file, args.artifacts_dir, args.output_file) else: raise ValueError("Unsupported mode.") if __name__ == "__main__": main() ``` Notes: - Unified categorical encoding: categories are collected during fit, sorted deterministically, and augmented with __MISSING__ and __UNSEEN__ tokens to preserve a stable column-space across batches. During transform, any value not in the known set maps to __UNSEEN__. - Timestamp handling: timestamps are parsed, localized if naive, converted to UTC, and normalized to epoch seconds. These become part of the numeric feature set and are standardized via z-scores. - Imputation: numeric features use median imputation; categorical features use a constant token for missing values. - Artifacts: the fitted ColumnTransformer and metadata (schema, categories, feature names) are saved and reused to ensure consistent preprocessing across batches.
快速生成统一的标准化脚本,批量处理多份数据,缩短探索与建模准备时间,提升结论可靠性。
为报表与仪表盘建立一致的数据口径,自动清洗与编码,减少重复劳动与口径争议,稳定上线节奏。
以可复现脚本记录预处理过程,支持多次实验对照,保障论文与项目数据的一致性与可信度。
迅速评估需求数据可用性,产出标准化方案与说明文档,推动跨部门评审与落地,降低沟通成本。
将渠道与活动数据一键规范化,快速得到可用指标与特征,加速A/B测试、复盘与投放优化。
使用生成脚本作为教学示例,拆解规范化流程与最佳实践,提升学员实操能力与作业质量。
无需深厚技术背景也能获得可运行脚本,快速搭建数据基础,验证产品假设与市场反馈。
将标准化步骤模块化落地,统一团队预处理规范,减少维护成本与返工风险。
帮助数据与业务团队快速把“杂乱原始数据”变成“可直接分析的标准数据”。通过一次精准提问,自动生成贴合你数据特征的Python标准化脚本,覆盖缺失值处理、异常值规整、数值缩放、类别字段统一、时间格式规范等常见清洗环节;同时输出清晰的使用步骤与注意事项。让数据准备时间从数小时缩短到数分钟,提升口径一致性与数据质量,减少对资深工程资源的依赖,加速报表出数与策略验证,最终推动效率提升与更稳的业务决策。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期