数据标准化脚本编写

0 浏览
0 试用
0 购买
Sep 27, 2025更新

编写Python脚本,对特定数据集进行标准化处理。

示例1

以下脚本针对约10万行的通用表格数据(含数值/分类型/日期字段、缺失值与异常值)进行标准化与统一编码。核心处理包括:
- 数值字段:缺失值中位数填充、IQR异常值裁剪、标准化(StandardScaler)。
- 分类型字段:文本归一化(Unicode NFKC、去空格/大小写)、低频类别折叠(避免维度爆炸)、一致的One-Hot编码(handle_unknown=ignore)。
- 日期字段:自动识别并解析为datetime、缺失值中位数填充、派生数值特征(年/月/日/星期等)、标准化。
- 统一编码:输出UTF-8,字符串统一规范化,避免不同编码导致的类别不一致。
- 可复用:以scikit-learn的Pipeline/ColumnTransformer组织,保证训练-推理一致性;输出处理后的数据与处理器。

使用方法(命令行示例):
- 安装依赖:pip install pandas scikit-learn joblib pyarrow
- 运行:python preprocess_standardize.py --input data.csv --output processed.parquet --sep , --encoding utf-8

脚本 preprocess_standardize.py:

```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import os
import re
import unicodedata
import warnings
from typing import List, Dict, Tuple

import numpy as np
import pandas as pd
from joblib import dump
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler

warnings.filterwarnings("ignore", category=FutureWarning)

# ===================== 工具函数 =====================

def normalize_text(s: str) -> str:
    """统一文本编码与格式:Unicode NFKC,去两端空白,压缩中间空白为单个空格,小写。
    保留原始内容的语义,不做翻译或映射。
    """
    if s is None:
        return s
    # 规范化
    s = unicodedata.normalize("NFKC", str(s))
    # 去两端空白
    s = s.strip()
    # 压缩多个空白
    s = re.sub(r"\s+", " ", s)
    # 小写
    s = s.lower()
    return s


def is_date_like_series(series: pd.Series, sample_size: int = 5000, threshold: float = 0.8) -> bool:
    """基于抽样的启发式日期识别:尝试 to_datetime,成功率 >= threshold 认为是日期字段。"""
    s = series.dropna()
    if s.empty:
        return False
    n = min(len(s), sample_size)
    sample = s.sample(n, random_state=42) if len(s) > n else s
    parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True)
    success_rate = parsed.notna().mean()
    return success_rate >= threshold


def infer_column_types(df: pd.DataFrame) -> Tuple[List[str], List[str], List[str]]:
    """推断数值、日期、分类型列。"""
    numeric_cols = df.select_dtypes(include=["number"]).columns.tolist()

    # 候选对象列用于日期与分类型识别
    obj_cols = df.select_dtypes(include=["object"]).columns.tolist()
    bool_cols = df.select_dtypes(include=["bool"]).columns.tolist()
    cat_dtype_cols = [c for c in df.columns if str(df[c].dtype) == "category"]

    # 先识别日期列
    date_cols = []
    for c in obj_cols:
        try:
            if is_date_like_series(df[c]):
                date_cols.append(c)
        except Exception:
            continue
    # 剩余对象列作为分类型
    categorical_cols = [c for c in obj_cols if c not in date_cols] + bool_cols + cat_dtype_cols

    # 去重与顺序
    categorical_cols = list(dict.fromkeys(categorical_cols))
    date_cols = list(dict.fromkeys(date_cols))
    numeric_cols = list(dict.fromkeys(numeric_cols))

    # 排除重叠
    numeric_cols = [c for c in numeric_cols if c not in date_cols]
    categorical_cols = [c for c in categorical_cols if c not in numeric_cols and c not in date_cols]

    return numeric_cols, date_cols, categorical_cols


# ===================== 自定义变换器 =====================

class NumericOutlierClipperImputer(BaseEstimator, TransformerMixin):
    """数值列:IQR裁剪异常值并用中位数填充缺失。"""
    def __init__(self, columns: List[str], iqr_k: float = 1.5):
        self.columns = columns
        self.iqr_k = iqr_k
        self.bounds_: Dict[str, Tuple[float, float]] = {}
        self.medians_: Dict[str, float] = {}

    def fit(self, X: pd.DataFrame, y=None):
        for c in self.columns:
            col = pd.to_numeric(X[c], errors="coerce")
            q1 = col.quantile(0.25)
            q3 = col.quantile(0.75)
            iqr = q3 - q1
            lower = q1 - self.iqr_k * iqr
            upper = q3 + self.iqr_k * iqr
            self.bounds_[c] = (lower, upper)
            self.medians_[c] = col.median()
        return self

    def transform(self, X: pd.DataFrame):
        X_out = X.copy()
        for c in self.columns:
            col = pd.to_numeric(X_out[c], errors="coerce")
            lower, upper = self.bounds_[c]
            col = col.clip(lower=lower, upper=upper)
            col = col.fillna(self.medians_[c])
            X_out[c] = col
        return X_out[self.columns]


class RareCategoryGrouper(BaseEstimator, TransformerMixin):
    """分类型列:将低频类别折叠为 '__rare__'(避免OHE维度爆炸)。
    可按比例 min_freq 或绝对阈值 min_count 控制。
    """
    def __init__(self, columns: List[str], min_freq: float = 0.01, min_count: int = None):
        self.columns = columns
        self.min_freq = min_freq
        self.min_count = min_count
        self.frequent_cats_: Dict[str, set] = {}

    def fit(self, X: pd.DataFrame, y=None):
        n = len(X)
        for c in self.columns:
            s = X[c].astype("object")
            vc = s.value_counts(dropna=False)
            if self.min_count is not None:
                threshold_count = self.min_count
            else:
                threshold_count = max(1, int(np.floor(self.min_freq * n)))
            keep = set(vc[vc >= threshold_count].index.tolist())
            # 保证保留缺失占位
            keep.add("__missing__")
            self.frequent_cats_[c] = keep
        return self

    def transform(self, X: pd.DataFrame):
        X_out = X.copy()
        for c in self.columns:
            s = X_out[c].astype("object")
            s = s.where(s.isin(self.frequent_cats_[c]), "__rare__")
            X_out[c] = s
        return X_out[self.columns]


class CategoricalTextNormalizer(BaseEstimator, TransformerMixin):
    """分类型列文本统一:NFKC、strip、空白压缩、小写;缺失值统一为'__missing__'。"""
    def __init__(self, columns: List[str]):
        self.columns = columns

    def fit(self, X: pd.DataFrame, y=None):
        return self

    def transform(self, X: pd.DataFrame):
        X_out = X.copy()
        for c in self.columns:
            s = X_out[c].astype("object")
            s = s.map(lambda v: "__missing__" if pd.isna(v) else normalize_text(v))
            # 空字符串也视为缺失占位
            s = s.replace("", "__missing__")
            X_out[c] = s
        return X_out[self.columns]


class DateFeatureExtractor(BaseEstimator, TransformerMixin):
    """日期列派生统一数值特征并标准化缺失:中位数填充;输出派生特征(不保留原日期列)。
    提取:year, month, day, dayofweek, dayofyear, week, is_month_start, is_month_end, is_quarter_start, is_quarter_end.
    """
    def __init__(self, columns: List[str]):
        self.columns = columns
        self.medians_: Dict[str, pd.Timestamp] = {}
        self.feature_names_: List[str] = []

    def fit(self, X: pd.DataFrame, y=None):
        for c in self.columns:
            dt = pd.to_datetime(X[c], errors="coerce", infer_datetime_format=True)
            med = dt.dropna().median()
            # 若全NaT,回退到Unix epoch
            if pd.isna(med):
                med = pd.Timestamp("1970-01-01")
            self.medians_[c] = med
        # 预先计算输出列名
        feats = ["year", "month", "day", "dayofweek", "dayofyear", "week",
                 "is_month_start", "is_month_end", "is_quarter_start", "is_quarter_end"]
        self.feature_names_ = [f"{c}__{f}" for c in self.columns for f in feats]
        return self

    def transform(self, X: pd.DataFrame):
        out = {}
        for c in self.columns:
            dt = pd.to_datetime(X[c], errors="coerce", infer_datetime_format=True)
            dt = dt.fillna(self.medians_[c])
            # isocalendar().week 返回UInt32,需要转为int
            iso = dt.dt.isocalendar()
            out[f"{c}__year"] = dt.dt.year.astype(np.int32)
            out[f"{c}__month"] = dt.dt.month.astype(np.int32)
            out[f"{c}__day"] = dt.dt.day.astype(np.int32)
            out[f"{c}__dayofweek"] = dt.dt.dayofweek.astype(np.int32)
            out[f"{c}__dayofyear"] = dt.dt.dayofyear.astype(np.int32)
            out[f"{c}__week"] = iso.week.astype("int32")
            out[f"{c}__is_month_start"] = dt.dt.is_month_start.astype(np.int8)
            out[f"{c}__is_month_end"] = dt.dt.is_month_end.astype(np.int8)
            out[f"{c}__is_quarter_start"] = dt.dt.is_quarter_start.astype(np.int8)
            out[f"{c}__is_quarter_end"] = dt.dt.is_quarter_end.astype(np.int8)
        return pd.DataFrame(out, index=X.index)


# ===================== 主流程 =====================

def build_preprocessor(df: pd.DataFrame) -> Tuple[Pipeline, List[str]]:
    """构建整体预处理器(ColumnTransformer + Pipelines),返回预处理器与输出特征名。"""
    numeric_cols, date_cols, categorical_cols = infer_column_types(df)

    # 数值管线:裁剪+填充+标准化
    numeric_pipe = Pipeline(steps=[
        ("clip_impute", NumericOutlierClipperImputer(columns=numeric_cols, iqr_k=1.5)),
        ("scaler", StandardScaler(with_mean=True, with_std=True))
    ])

    # 日期管线:派生特征 + 标准化
    date_pipe = Pipeline(steps=[
        ("date_features", DateFeatureExtractor(columns=date_cols)),
        ("scaler", StandardScaler(with_mean=True, with_std=True))
    ])

    # 分类型管线:文本统一 + 低频折叠 + One-Hot
    categorical_pipe = Pipeline(steps=[
        ("text_norm", CategoricalTextNormalizer(columns=categorical_cols)),
        ("rare", RareCategoryGrouper(columns=categorical_cols, min_freq=0.01, min_count=None)),
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=False))
    ])

    # 组合各列
    transformers = []
    if numeric_cols:
        transformers.append(("num", numeric_pipe, numeric_cols))
    if date_cols:
        transformers.append(("date", date_pipe, date_cols))
    if categorical_cols:
        transformers.append(("cat", categorical_pipe, categorical_cols))

    preprocessor = ColumnTransformer(transformers=transformers, remainder="drop", sparse_threshold=0.0)
    pipeline = Pipeline(steps=[("preprocessor", preprocessor)])

    # 输出特征名:需在拟合后才能确定(尤其是OHE)
    return pipeline, (numeric_cols, date_cols, categorical_cols)


def feature_names_after_fit(pipeline: Pipeline, numeric_cols: List[str], date_cols: List[str], categorical_cols: List[str]) -> List[str]:
    """从已拟合的ColumnTransformer中提取输出特征名。兼容不同sklearn版本。"""
    ct: ColumnTransformer = pipeline.named_steps["preprocessor"]
    names = []
    for name, transformer, cols in ct.transformers_:
        if name == "num":
            # StandardScaler不改变列名
            names.extend(cols)
        elif name == "date":
            # DateFeatureExtractor生成的派生列名
            df_extractor: DateFeatureExtractor = transformer.named_steps["date_features"]
            names.extend(df_extractor.feature_names_)
        elif name == "cat":
            ohe: OneHotEncoder = transformer.named_steps["ohe"]
            try:
                # 新版支持 get_feature_names_out
                ohe_names = ohe.get_feature_names_out(categorical_cols).tolist()
            except Exception:
                # 回退:用类别组合生成
                ohe_names = []
                for i, c in enumerate(categorical_cols):
                    cats = ohe.categories_[i]
                    ohe_names.extend([f"{c}__{cat}" for cat in cats])
            names.extend(ohe_names)
    return names


def main():
    parser = argparse.ArgumentParser(description="数据集标准化与统一编码预处理脚本")
    parser.add_argument("--input", required=True, help="输入CSV文件路径")
    parser.add_argument("--output", required=True, help="输出文件路径(建议.parquet)")
    parser.add_argument("--sep", default=",", help="CSV分隔符,默认','")
    parser.add_argument("--encoding", default="utf-8", help="输入文件编码,默认utf-8")
    parser.add_argument("--save-pipeline", default="preprocessor.joblib", help="保存预处理器的路径")
    parser.add_argument("--nrows", type=int, default=None, help="可选:仅读取前n行用于快速试跑")
    args = parser.parse_args()

    # 读取数据
    df = pd.read_csv(args.input, sep=args.sep, encoding=args.encoding, low_memory=False, nrows=args.nrows)

    # 构建并拟合预处理器
    pipeline, (numeric_cols, date_cols, categorical_cols) = build_preprocessor(df)
    pipeline.fit(df)

    # 变换数据
    X = pipeline.transform(df)

    # 输出特征名并保存为DataFrame
    feat_names = feature_names_after_fit(pipeline, numeric_cols, date_cols, categorical_cols)
    # 若长度不匹配,回退生成通用列名
    if X.shape[1] != len(feat_names):
        feat_names = [f"f_{i}" for i in range(X.shape[1])]
    out_df = pd.DataFrame(X, columns=feat_names, index=df.index)

    # 保存为Parquet(统一UTF-8,压缩)
    out_path = args.output
    ext = os.path.splitext(out_path)[1].lower()
    if ext in [".parquet", ".pq"]:
        try:
            out_df.to_parquet(out_path, engine="pyarrow", index=False)
        except Exception:
            # 回退到fastparquet
            out_df.to_parquet(out_path, engine="fastparquet", index=False)
    else:
        # CSV输出,确保UTF-8
        out_df.to_csv(out_path, index=False, encoding="utf-8")

    # 保存预处理器以保证训练-推理一致
    dump(pipeline, args.save_pipeline)

    # 简要信息
    print("预处理完成:")
    print(f"- 数值列:{len(numeric_cols)}")
    print(f"- 日期列:{len(date_cols)}")
    print(f"- 分类型列:{len(categorical_cols)}")
    print(f"- 输出特征数:{out_df.shape[1]}")
    print(f"- 已保存数据:{out_path}")
    print(f"- 已保存预处理器:{args.save_pipeline}")


if __name__ == "__main__":
    main()
```

说明与建议:
- IQR裁剪结合StandardScaler适合存在异常值的数值分布;若异常值极端且比例较高,可将StandardScaler替换为RobustScaler以增强稳健性。
- 低频类别折叠阈值min_freq=1%旨在控制One-Hot维度;可根据实际唯一值规模与内存情况调整为更高或更低,或使用min_count做绝对控制。
- 日期特征派生为纯数值再标准化,避免保留原始datetime;如业务需要更多时间特征(例如与某基准日期的差值、节假日标识等),可在DateFeatureExtractor中扩展。
- 该脚本不依赖目标变量;在建模流程中应仅在训练集上拟合pipeline,再对验证/测试集调用transform,确保无数据泄露。

示例2

以下为一个可执行的标准化脚本示例,面向“交易表 + 用户表”的常见问题:主键不唯一、分类型编码不一致、时区需统一。脚本具备如下能力:
- 统一列名为 snake_case
- 基于配置或自动推断的时间列统一到目标时区
- 可配置的主键去重(默认按时间优先级保留“最新”记录)
- 分类型字段统一清洗与一致编码,跨表使用同一编码词表
- 输出清洗后的表、编码映射、数据字典和去重报告

请根据实际字段名通过命令行参数或在配置中指定主键、时区和时间优先级列。

代码如下:

```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
标准化流水线脚本:交易表与用户表
功能:
- 统一列名
- 主键去重(支持按时间优先级保留最新记录)
- 时区统一
- 分类型字段一致化与编码
- 输出清洗结果与元数据工件

依赖:
- Python 3.8+
- pandas, numpy

使用示例:
python standardize_pipeline.py \
  --transactions-path ./data/transactions.csv \
  --users-path ./data/users.parquet \
  --output-dir ./out \
  --transactions-pk transaction_id \
  --users-pk user_id \
  --target-tz UTC \
  --naive-source-tz Asia/Shanghai \
  --transactions-time-priority event_time,updated_at \
  --users-time-priority updated_at,created_at \
  --infer-categorical
"""

import argparse
import json
import os
import re
import sys
import unicodedata
from typing import Dict, List, Optional, Tuple

import numpy as np
import pandas as pd

# 兼容 zoneinfo(优先)与 pytz
try:
    from zoneinfo import ZoneInfo  # py39+
    _tz = ZoneInfo
except Exception:
    try:
        import pytz
        def _tz(name: str):
            return pytz.timezone(name)
    except Exception:
        _tz = None


# ============ 工具函数 ============

def standardize_colname(name: str) -> str:
    """列名标准化为 snake_case,移除前后空格并规范字符宽度。"""
    if name is None:
        return name
    s = unicodedata.normalize("NFKC", str(name)).strip()
    s = s.replace("-", "_").replace("/", "_").replace(".", "_")
    s = re.sub(r"[^\w]+", "_", s)
    s = re.sub(r"_+", "_", s)
    s = s.strip("_")
    return s.lower()


def standardize_columns(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df.columns = [standardize_colname(c) for c in df.columns]
    return df


def read_any(path: str) -> pd.DataFrame:
    """按扩展名读取 CSV/Parquet/Feather。"""
    ext = os.path.splitext(path)[1].lower()
    if ext in [".csv", ".txt"]:
        return pd.read_csv(path)
    if ext in [".parquet", ".pq"]:
        return pd.read_parquet(path)
    if ext in [".feather", ".ft"]:
        return pd.read_feather(path)
    raise ValueError(f"不支持的文件扩展名: {ext}")


def write_any(df: pd.DataFrame, path: str):
    """按扩展名写出到 CSV 或 Parquet。"""
    ext = os.path.splitext(path)[1].lower()
    os.makedirs(os.path.dirname(path), exist_ok=True)
    if ext in [".csv", ".txt"]:
        df.to_csv(path, index=False)
    elif ext in [".parquet", ".pq"]:
        df.to_parquet(path, index=False)
    else:
        raise ValueError(f"不支持的输出扩展名: {ext}")


def detect_datetime_columns(df: pd.DataFrame, sample_size: int = 1000, success_rate: float = 0.6) -> List[str]:
    """基于采样的鲁棒时间列识别。"""
    cols = []
    n = min(len(df), sample_size)
    sample_df = df.sample(n) if n > 0 else df
    for c in df.columns:
        if df[c].dtype.kind in "Mm":  # 已是 datetime64 dtype
            cols.append(c)
            continue
        if df[c].dtype == object or np.issubdtype(df[c].dtype, np.number):
            try:
                parsed = pd.to_datetime(sample_df[c], errors="coerce", infer_datetime_format=True)
                rate = parsed.notna().mean() if len(parsed) > 0 else 0.0
                if rate >= success_rate:
                    cols.append(c)
            except Exception:
                pass
    return cols


def parse_any_timestamp(val, naive_source_tz: Optional[str], target_tz: str):
    """单值层面的时间解析与时区统一。对混合格式具有鲁棒性。"""
    if pd.isna(val):
        return pd.NaT
    try:
        # 数字视为 epoch 秒/毫秒
        if isinstance(val, (int, np.integer, float, np.floating)) and not isinstance(val, bool):
            v = float(val)
            if np.isnan(v):
                return pd.NaT
            # 简单判别位数
            if abs(v) >= 1e12:  # 毫秒
                ts = pd.to_datetime(int(v), unit="ms", utc=False)
            else:  # 秒
                ts = pd.to_datetime(int(v), unit="s", utc=False)
        else:
            ts = pd.to_datetime(val, errors="coerce", utc=False, infer_datetime_format=True)
        if ts is pd.NaT:
            return pd.NaT
        # 时区处理
        if getattr(ts, "tzinfo", None) is None:
            if naive_source_tz:
                if _tz is None:
                    raise RuntimeError("缺少时区库(zoneinfo/pytz),无法本地化 naive 时间")
                ts = ts.tz_localize(_tz(naive_source_tz), ambiguous="NaT", nonexistent="NaT")
            else:
                # 默认将 naive 视作 UTC,以避免错误偏移(可通过参数覆盖)
                if _tz is None:
                    return ts  # 退化为 naive
                ts = ts.tz_localize(_tz("UTC"))
        if _tz is None:
            return ts  # 无法进一步转换
        return ts.tz_convert(_tz(target_tz))
    except Exception:
        return pd.NaT


def unify_timezones(
    df: pd.DataFrame,
    tz_target: str,
    tz_source_naive: Optional[str],
    datetime_cols: Optional[List[str]] = None
) -> pd.DataFrame:
    """将指定时间列统一到目标时区;未指定则自动识别。"""
    if tz_target is None:
        return df
    if datetime_cols is None:
        datetime_cols = detect_datetime_columns(df)
    if not datetime_cols:
        return df
    out = df.copy()
    for c in datetime_cols:
        out[c] = out[c].apply(lambda x: parse_any_timestamp(x, tz_source_naive, tz_target))
    return out


def last_non_null_with_priority(df: pd.DataFrame, sort_cols: List[str]) -> pd.Series:
    """
    按时间优先级排序后,返回每列最后一个非空值。
    假设 df 已按 sort_cols 排序(升序/降序在上层控制)。
    """
    return df.ffill().bfill().iloc[-1]


def deduplicate_by_pk(
    df: pd.DataFrame,
    pk: str,
    time_priority: Optional[List[str]] = None,
    ascending: bool = True
) -> Tuple[pd.DataFrame, pd.DataFrame]:
    """
    主键去重,默认按时间优先级列排序,保留“最新”记录。
    返回:去重后的数据框、被移除的重复记录明细。
    """
    if pk not in df.columns:
        raise ValueError(f"主键列 {pk} 不在数据集中")

    dup_mask = df[pk].duplicated(keep=False)
    dup_df = df.loc[dup_mask].copy()

    if time_priority:
        # 仅使用存在的排序列
        sort_cols = [c for c in time_priority if c in df.columns]
        if sort_cols:
            # 缺失时间列排序时,将缺失值置于最前/最后的行为由 na_position 控制
            df_sorted = df.sort_values(by=sort_cols, ascending=ascending, na_position="first")
            # drop_duplicates 保留最后一条(即“最新”)
            deduped = df_sorted.drop_duplicates(subset=[pk], keep="last")
        else:
            deduped = df.drop_duplicates(subset=[pk], keep="last")
    else:
        deduped = df.drop_duplicates(subset=[pk], keep="last")

    removed = pd.concat([df, deduped]).drop_duplicates(keep=False)
    return deduped.reset_index(drop=True), removed.reset_index(drop=True)


def is_probably_boolean_col(name: str, series: pd.Series) -> bool:
    """基于列名与取值判断是否布尔/标志类列。"""
    name_hit = bool(re.search(r"(is_|has_|flag|active|valid|enabled|deleted|available|status)$", name))
    # 低基数时进一步判断
    if series.dtype == object or pd.api.types.is_categorical_dtype(series):
        vals = pd.Series(series.dropna().astype(str).str.lower().str.strip().unique())
        val_hit = vals.isin(["y", "n", "yes", "no", "true", "false", "0", "1", "是", "否"]).mean() >= 0.6
    else:
        val_hit = False
    return name_hit or val_hit


def canonicalize_category_value(x: object) -> object:
    """通用分类型值清洗(大小写、空白、全半角、统一空值)。"""
    if pd.isna(x):
        return np.nan
    s = str(x)
    s = unicodedata.normalize("NFKC", s)
    s = s.strip().lower()
    s = re.sub(r"\s+", " ", s)
    if s in ["", "nan", "none", "null", "na"]:
        return np.nan
    return s


def normalize_boolean_value(x: object) -> object:
    """布尔值统一为 'yes'/'no'(保持字符串表示,便于后续一致编码)。"""
    if pd.isna(x):
        return np.nan
    s = str(x).strip().lower()
    mapping = {
        "y": "yes", "yes": "yes", "true": "yes", "1": "yes", "t": "yes", "是": "yes",
        "n": "no", "no": "no", "false": "no", "0": "no", "f": "no", "否": "no"
    }
    return mapping.get(s, canonicalize_category_value(s))


def infer_categorical_columns(df: pd.DataFrame, max_unique: int = 1000, unique_ratio: float = 0.5) -> List[str]:
    """自动识别分类型列:低基数或唯一比率较低的 object/category 列。"""
    cats = []
    for c in df.columns:
        if df[c].dtype == object or pd.api.types.is_categorical_dtype(df[c]):
            nunique = df[c].nunique(dropna=True)
            if nunique <= max_unique and (nunique / max(1, len(df))) <= unique_ratio:
                cats.append(c)
    return cats


def build_global_category_vocab(
    dfs: List[pd.DataFrame],
    cat_cols: List[str],
    boolean_cols: Optional[List[str]] = None
) -> Dict[str, Dict[str, int]]:
    """
    构建全局一致编码词表:对指定列跨表收集所有取值,排序后映射为稳定整数编码。
    - boolean 列先做 'yes'/'no' 归一
    - 其余列做通用清洗
    """
    vocab = {}
    boolean_cols = set(boolean_cols or [])
    for c in cat_cols:
        vals = []
        for df in dfs:
            if c in df.columns:
                series = df[c]
                if c in boolean_cols:
                    series = series.map(normalize_boolean_value)
                else:
                    series = series.map(canonicalize_category_value)
                vals.append(series.dropna().unique())
        if not vals:
            continue
        all_vals = pd.unique(pd.Series(np.concatenate(vals, axis=0)))
        # 排序稳定(先长度后字典序可以减少拼写接近值的编码跳变)
        all_vals_sorted = sorted(all_vals, key=lambda s: (len(str(s)), str(s)))
        vocab[c] = {str(v): i for i, v in enumerate(all_vals_sorted, start=1)}  # 从1开始编码,0保留为空
    return vocab


def apply_categorical_encoding(
    df: pd.DataFrame,
    vocab: Dict[str, Dict[str, int]],
    boolean_cols: Optional[List[str]] = None,
    drop_original: bool = False,
    suffix: str = "_enc"
) -> pd.DataFrame:
    """应用一致编码,生成 *_enc 整数列;可选择是否保留原列。"""
    out = df.copy()
    boolean_cols = set(boolean_cols or [])
    for c, mapping in vocab.items():
        if c not in out.columns:
            continue
        cleaned = out[c].map(normalize_boolean_value if c in boolean_cols else canonicalize_category_value)
        enc = cleaned.map(lambda v: (0 if pd.isna(v) else mapping.get(str(v), 0))).astype("Int32")
        new_col = f"{c}{suffix}"
        out[new_col] = enc
        if drop_original:
            out.drop(columns=[c], inplace=True)
    return out


def save_metadata(
    output_dir: str,
    artifacts: Dict[str, object],
    fname: str = "metadata.json"
):
    os.makedirs(output_dir, exist_ok=True)
    path = os.path.join(output_dir, fname)
    with open(path, "w", encoding="utf-8") as f:
        json.dump(artifacts, f, ensure_ascii=False, indent=2)


# ============ 主流程 ============

def run_pipeline(
    transactions_path: str,
    users_path: str,
    output_dir: str,
    transactions_pk: str,
    users_pk: str,
    target_tz: Optional[str] = "UTC",
    naive_source_tz: Optional[str] = None,
    transactions_time_priority: Optional[List[str]] = None,
    users_time_priority: Optional[List[str]] = None,
    infer_cats: bool = True,
    explicit_cat_cols: Optional[List[str]] = None,
    drop_original_cats: bool = False
):
    # 1) 读取
    tx = read_any(transactions_path)
    us = read_any(users_path)

    # 2) 列名规范
    tx = standardize_columns(tx)
    us = standardize_columns(us)
    transactions_pk = standardize_colname(transactions_pk)
    users_pk = standardize_colname(users_pk)
    transactions_time_priority = [standardize_colname(c) for c in (transactions_time_priority or [])]
    users_time_priority = [standardize_colname(c) for c in (users_time_priority or [])]
    if explicit_cat_cols:
        explicit_cat_cols = [standardize_colname(c) for c in explicit_cat_cols]

    # 3) 时区统一(自动识别时间列,统一到 target_tz;naive 使用 naive_source_tz 本地化)
    tx = unify_timezones(tx, tz_target=target_tz, tz_source_naive=naive_source_tz)
    us = unify_timezones(us, tz_target=target_tz, tz_source_naive=naive_source_tz)

    # 4) 主键去重(默认保留最新)
    tx_dedup, tx_removed = deduplicate_by_pk(
        tx, pk=transactions_pk, time_priority=transactions_time_priority, ascending=True
    )
    us_dedup, us_removed = deduplicate_by_pk(
        us, pk=users_pk, time_priority=users_time_priority, ascending=True
    )

    # 5) 分类型列识别
    if explicit_cat_cols:
        cat_cols = explicit_cat_cols
    else:
        # 合并两个表的候选分类型列,取交集优先保障跨表一致性;若交集为空,取并集
        tx_cats = infer_categorical_columns(tx_dedup) if infer_cats else []
        us_cats = infer_categorical_columns(us_dedup) if infer_cats else []
        inter = sorted(list(set(tx_cats).intersection(us_cats)))
        union = sorted(list(set(tx_cats).union(us_cats)))
        cat_cols = inter if inter else union

    # 根据列名/取值推断布尔列
    boolean_cols = []
    for c in cat_cols:
        s_tx = tx_dedup[c] if c in tx_dedup.columns else pd.Series([], dtype=object)
        s_us = us_dedup[c] if c in us_dedup.columns else pd.Series([], dtype=object)
        s = pd.concat([s_tx, s_us], ignore_index=True)
        if is_probably_boolean_col(c, s):
            boolean_cols.append(c)

    # 6) 构建全局一致编码词表并应用
    vocab = build_global_category_vocab([tx_dedup, us_dedup], cat_cols=cat_cols, boolean_cols=boolean_cols)
    tx_std = apply_categorical_encoding(
        tx_dedup, vocab=vocab, boolean_cols=boolean_cols, drop_original=drop_original_cats
    )
    us_std = apply_categorical_encoding(
        us_dedup, vocab=vocab, boolean_cols=boolean_cols, drop_original=drop_original_cats
    )

    # 7) 输出
    os.makedirs(output_dir, exist_ok=True)
    # 数据
    write_any(tx_std, os.path.join(output_dir, "transactions_standardized.parquet"))
    write_any(us_std, os.path.join(output_dir, "users_standardized.parquet"))
    # 去重报告
    tx_removed.to_csv(os.path.join(output_dir, "transactions_removed_duplicates.csv"), index=False)
    us_removed.to_csv(os.path.join(output_dir, "users_removed_duplicates.csv"), index=False)
    # 词表与元数据
    artifacts = {
        "target_timezone": target_tz,
        "naive_source_timezone": naive_source_tz,
        "transactions": {
            "path": transactions_path,
            "primary_key": transactions_pk,
            "time_priority": transactions_time_priority,
            "rows_in": int(len(tx)),
            "rows_out": int(len(tx_std)),
            "duplicates_removed": int(len(tx_removed)),
        },
        "users": {
            "path": users_path,
            "primary_key": users_pk,
            "time_priority": users_time_priority,
            "rows_in": int(len(us)),
            "rows_out": int(len(us_std)),
            "duplicates_removed": int(len(us_removed)),
        },
        "categorical_columns": cat_cols,
        "boolean_like_columns": boolean_cols,
        "categorical_vocab": vocab,
        "dtype_after_transactions": {c: str(tx_std[c].dtype) for c in tx_std.columns},
        "dtype_after_users": {c: str(us_std[c].dtype) for c in us_std.columns},
    }
    save_metadata(output_dir, artifacts, fname="metadata.json")

    print("标准化完成:")
    print(f"- 交易表输入/输出行数:{len(tx)} -> {len(tx_std)};去重条数:{len(tx_removed)}")
    print(f"- 用户表输入/输出行数:{len(us)} -> {len(us_std)};去重条数:{len(us_removed)}")
    print(f"- 结果目录:{output_dir}")


def parse_args():
    p = argparse.ArgumentParser(description="交易与用户表标准化脚本")
    p.add_argument("--transactions-path", required=True, help="交易表路径(csv/parquet/feather)")
    p.add_argument("--users-path", required=True, help="用户表路径(csv/parquet/feather)")
    p.add_argument("--output-dir", required=True, help="输出目录")

    p.add_argument("--transactions-pk", required=True, help="交易表主键列名")
    p.add_argument("--users-pk", required=True, help="用户表主键列名")

    p.add_argument("--target-tz", default="UTC", help="目标时区(如 UTC 或 Asia/Shanghai)")
    p.add_argument("--naive-source-tz", default=None, help="当时间值缺少时区信息时的默认来源时区")

    p.add_argument("--transactions-time-priority", default=None,
                   help="交易表时间优先级列,逗号分隔(如 event_time,updated_at)")
    p.add_argument("--users-time-priority", default=None,
                   help="用户表时间优先级列,逗号分隔(如 updated_at,created_at)")

    p.add_argument("--infer-categorical", action="store_true",
                   help="自动识别分类型列(否则仅按 --categorical-cols 使用)")
    p.add_argument("--categorical-cols", default=None,
                   help="显式指定需要一致编码的列名,逗号分隔(若提供则覆盖自动识别)")
    p.add_argument("--drop-original-cats", action="store_true",
                   help="是否在编码后删除原始分类型列")

    args = p.parse_args()

    def split_cols(v: Optional[str]) -> Optional[List[str]]:
        if v is None or str(v).strip() == "":
            return None
        return [s.strip() for s in v.split(",") if s.strip()]

    return {
        "transactions_path": args.transactions_path,
        "users_path": args.users_path,
        "output_dir": args.output_dir,
        "transactions_pk": args.transactions_pk,
        "users_pk": args.users_pk,
        "target_tz": args.target_tz,
        "naive_source_tz": args.naive_source_tz,
        "transactions_time_priority": split_cols(args.transactions_time_priority),
        "users_time_priority": split_cols(args.users_time_priority),
        "infer_cats": bool(args.infer_categorical),
        "explicit_cat_cols": split_cols(args.categorical_cols),
        "drop_original_cats": bool(args.drop_original_cats),
    }


if __name__ == "__main__":
    params = parse_args()
    try:
        run_pipeline(**params)
    except Exception as e:
        print(f"错误:{e}", file=sys.stderr)
        sys.exit(1)
```

说明与建议:
- 主键不唯一:脚本默认按时间优先级列排序后保留“最新”记录;请通过 --transactions-time-priority 与 --users-time-priority 指定时间列优先级,未指定时按出现顺序且仅基于行序保留最后一条。
- 时区统一:对含混合格式/是否含时区的时间列逐值解析;对 naive 时间请通过 --naive-source-tz 指定来源时区,避免被默认当作 UTC。目标时区通过 --target-tz 指定。
- 分类型编码不一致:可用 --infer-categorical 自动识别分类列,或通过 --categorical-cols 显式指定。脚本跨表构建统一词表并输出到 metadata.json 的 categorical_vocab。
- 性能注意:逐值时间解析在极大数据上可能偏慢;如有明确时间列且格式一致,建议先将这些列以向量化方式转为 datetime,再调用本脚本的去重与编码流程或调整 unify_timezones 实现为向量化处理。

示例3

Below is a self-contained, reproducible Python script that standardizes datasets containing continuous variables, categorical variables, and timestamps. It ensures unified categorical encoding across small-sample, multi-batch scenarios, and saves the fitted preprocessing artifacts for consistent application to future batches.

Key properties:
- Deterministic, reproducible preprocessing and encoding.
- Unified categorical encoding with explicit handling for missing and unseen categories.
- Timestamp normalization to UTC epoch seconds and inclusion in numeric standardization.
- Artifacts (pipeline and metadata) are saved and reused across batches.

Script:

```python
#!/usr/bin/env python3
"""
Dataset Standardization Script: continuous + categorical + timestamp
- Fit mode: builds and saves preprocessing artifacts from training batches.
- Transform mode: applies saved artifacts to new batches, producing standardized output.

Requirements:
  - Python 3.8+
  - pandas >= 1.3
  - numpy >= 1.20
  - scikit-learn >= 1.1
  - joblib

Example usage:
  Fit on training directory:
    python standardize.py fit \
      --input_dir ./train_batches \
      --schema ./schema.json \
      --output_dir ./artifacts

  Transform a new batch:
    python standardize.py transform \
      --input_file ./batch_new.csv \
      --artifacts_dir ./artifacts \
      --output_file ./batch_new_standardized.csv

Schema (JSON) example:
{
  "continuous": ["num_col1", "num_col2"],
  "categorical": ["cat_a", "cat_b"],
  "timestamp": ["event_time"],
  "timestamp_parsing": {
    "format": null,            // optional strftime format; null = auto
    "timezone": "UTC"          // timezone of input if naive; converted to UTC
  },
  "imputation": {
    "numeric_strategy": "median",
    "categorical_missing_token": "__MISSING__",
    "unseen_token": "__UNSEEN__"
  }
}
"""
import argparse
import json
import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple

import joblib
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer


# -----------------------
# Utilities
# -----------------------
def _load_schema(schema_path: str) -> Dict:
    with open(schema_path, "r", encoding="utf-8") as f:
        schema = json.load(f)

    # Basic validation
    for key in ["continuous", "categorical", "timestamp"]:
        if key not in schema or not isinstance(schema[key], list):
            schema[key] = []

    # Defaults
    ts_parsing = schema.get("timestamp_parsing", {})
    schema["timestamp_parsing"] = {
        "format": ts_parsing.get("format", None),
        "timezone": ts_parsing.get("timezone", "UTC"),
    }

    imp = schema.get("imputation", {})
    schema["imputation"] = {
        "numeric_strategy": imp.get("numeric_strategy", "median"),
        "categorical_missing_token": imp.get("categorical_missing_token", "__MISSING__"),
        "unseen_token": imp.get("unseen_token", "__UNSEEN__"),
    }
    return schema


def _read_csvs_from_dir(input_dir: str) -> List[pd.DataFrame]:
    files = sorted([str(p) for p in Path(input_dir).glob("*.csv")])
    if not files:
        raise FileNotFoundError(f"No CSV files found in directory: {input_dir}")
    dfs = []
    for fp in files:
        df = pd.read_csv(fp)
        dfs.append(df)
    return dfs


def _read_csv(input_file: str) -> pd.DataFrame:
    return pd.read_csv(input_file)


def _parse_timestamp_series(
    s: pd.Series,
    fmt: Optional[str],
    tz: Optional[str],
) -> pd.Series:
    # Robust parsing
    # If fmt given, use it; else let pandas infer
    if fmt:
        dt = pd.to_datetime(s, format=fmt, errors="coerce")
    else:
        dt = pd.to_datetime(s, errors="coerce")

    # Handle timezone: if naive, localize to tz; then convert to UTC
    tz_in = tz or "UTC"
    if dt.dt.tz is None:
        # localize naive timestamps to tz_in
        try:
            dt = dt.dt.tz_localize(tz_in)
        except Exception:
            # If localization fails (e.g., invalid tz), fallback to UTC
            dt = dt.dt.tz_localize("UTC")
    # Convert to UTC
    dt = dt.dt.tz_convert("UTC")

    # Convert to epoch seconds (float)
    # astype('int64') -> nanoseconds since epoch
    epoch_seconds = dt.view("int64") / 1e9
    return epoch_seconds


def _preprocess_timestamps(
    df: pd.DataFrame,
    timestamp_cols: List[str],
    ts_format: Optional[str],
    tz: Optional[str],
) -> Tuple[pd.DataFrame, List[str]]:
    """
    Converts timestamp columns to numeric epoch seconds in UTC.
    Returns:
      - df with additional numeric columns "<col>__epoch_seconds"
      - list of new timestamp feature names
    """
    ts_numeric_cols = []
    for col in timestamp_cols:
        if col not in df.columns:
            # missing timestamp column; create NaNs
            df[col] = np.nan
        epoch = _parse_timestamp_series(df[col], ts_format, tz)
        new_col = f"{col}__epoch_seconds"
        df[new_col] = epoch.astype(float)
        ts_numeric_cols.append(new_col)
        # Drop original timestamp column to avoid accidental reuse
        df.drop(columns=[col], inplace=True)
    return df, ts_numeric_cols


def _collect_categorical_sets(
    dfs: List[pd.DataFrame],
    categorical_cols: List[str],
    missing_token: str,
    unseen_token: str,
) -> Dict[str, List[str]]:
    """
    Collects sorted unique categorical values across all training batches.
    Adds missing_token and unseen_token explicitly to categories for stable encoding.
    """
    cat_values = {c: set() for c in categorical_cols}
    for df in dfs:
        for c in categorical_cols:
            if c not in df.columns:
                continue
            # Convert to string to enforce consistent dtype
            vals = df[c].astype("string")
            # Exclude NA here; missing handled via imputation, but still add missing_token explicitly
            uniq = set([str(v) for v in vals.dropna().unique().tolist()])
            cat_values[c].update(uniq)

    categories = {}
    for c in categorical_cols:
        base = sorted(cat_values[c]) if cat_values[c] else []
        # Ensure deterministic presence and order of special tokens
        if missing_token not in base:
            base.append(missing_token)
        if unseen_token not in base:
            base.append(unseen_token)
        categories[c] = base
    return categories


def _map_unseen_categories(
    df: pd.DataFrame,
    categorical_cols: List[str],
    known_categories: Dict[str, List[str]],
    missing_token: str,
    unseen_token: str,
) -> pd.DataFrame:
    """
    Maps categorical values not in known_categories to unseen_token.
    Explicitly fills missing with missing_token.
    """
    for c in categorical_cols:
        if c not in df.columns:
            # create column filled as missing_token
            df[c] = missing_token
        # Ensure string dtype
        s = df[c].astype("string")
        known_set = set(known_categories[c])
        s = s.fillna(missing_token)
        df[c] = s.where(s.isin(known_set), other=unseen_token).astype("string")
    return df


def _build_preprocessor(
    numeric_cols: List[str],
    categorical_cols: List[str],
    categories_map: Dict[str, List[str]],
    numeric_impute_strategy: str,
    missing_token: str,
) -> ColumnTransformer:
    """
    Builds the ColumnTransformer with:
      - Numeric: SimpleImputer + StandardScaler
      - Categorical: SimpleImputer (constant missing_token) + OneHotEncoder with fixed categories
    """
    numeric_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy=numeric_impute_strategy)),
            ("scaler", StandardScaler(with_mean=True, with_std=True)),
        ]
    )

    # OneHot with fixed categories in deterministic order
    cat_categories = [categories_map[c] for c in categorical_cols]
    categorical_pipeline = Pipeline(
        steps=[
            ("imputer", SimpleImputer(strategy="constant", fill_value=missing_token)),
            ("onehot", OneHotEncoder(categories=cat_categories, handle_unknown="ignore", sparse=False)),
        ]
    )

    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_pipeline, numeric_cols),
            ("cat", categorical_pipeline, categorical_cols),
        ],
        remainder="drop",
        verbose_feature_names_out=True,
    )
    return preprocessor


def _fit(
    input_dir: str,
    schema_path: str,
    output_dir: str,
) -> None:
    os.makedirs(output_dir, exist_ok=True)

    schema = _load_schema(schema_path)
    continuous = schema["continuous"]
    categorical = schema["categorical"]
    timestamp = schema["timestamp"]
    ts_fmt = schema["timestamp_parsing"]["format"]
    tz = schema["timestamp_parsing"]["timezone"]
    numeric_strategy = schema["imputation"]["numeric_strategy"]
    missing_token = schema["imputation"]["categorical_missing_token"]
    unseen_token = schema["imputation"]["unseen_token"]

    dfs = _read_csvs_from_dir(input_dir)

    # Collect categories across all batches
    categories_map = _collect_categorical_sets(dfs, categorical, missing_token, unseen_token)

    # Concatenate for fitting scaler statistics
    # For small samples, we can load all to memory
    df_all = pd.concat(dfs, axis=0, ignore_index=True)

    # Timestamp preprocessing -> numeric epoch seconds
    df_all, ts_numeric_cols = _preprocess_timestamps(df_all, timestamp, ts_fmt, tz)

    # Enforce dtypes and map unknowns for categorical columns using collected categories
    df_all = _map_unseen_categories(df_all, categorical, categories_map, missing_token, unseen_token)

    # Numeric columns include continuous + engineered timestamp numeric cols
    numeric_cols = list(continuous) + list(ts_numeric_cols)

    # Build and fit preprocessor
    preprocessor = _build_preprocessor(
        numeric_cols=numeric_cols,
        categorical_cols=categorical,
        categories_map=categories_map,
        numeric_impute_strategy=numeric_strategy,
        missing_token=missing_token,
    )

    # Fit
    preprocessor.fit(df_all)

    # Persist artifacts
    joblib.dump(preprocessor, os.path.join(output_dir, "preprocessor.joblib"))

    # Derive output feature names for reproducibility
    try:
        feature_names = preprocessor.get_feature_names_out().tolist()
    except Exception:
        feature_names = None

    metadata = {
        "schema": {
            "continuous": continuous,
            "categorical": categorical,
            "timestamp": timestamp,
            "timestamp_parsing": {"format": ts_fmt, "timezone": tz},
            "imputation": {
                "numeric_strategy": numeric_strategy,
                "categorical_missing_token": missing_token,
                "unseen_token": unseen_token,
            },
        },
        "categories_map": categories_map,       # fixed categories per categorical column
        "numeric_cols": numeric_cols,           # includes timestamp-derived numeric columns
        "feature_names_out": feature_names,     # final standardized feature names (if available)
        "version": "1.0",
    }

    with open(os.path.join(output_dir, "metadata.json"), "w", encoding="utf-8") as f:
        json.dump(metadata, f, ensure_ascii=False, indent=2)

    print(f"Fit complete. Artifacts saved to: {output_dir}")


def _transform(
    input_file: str,
    artifacts_dir: str,
    output_file: str,
) -> None:
    # Load artifacts
    preprocessor = joblib.load(os.path.join(artifacts_dir, "preprocessor.joblib"))
    with open(os.path.join(artifacts_dir, "metadata.json"), "r", encoding="utf-8") as f:
        metadata = json.load(f)

    schema = metadata["schema"]
    categories_map = metadata["categories_map"]
    continuous = schema["continuous"]
    categorical = schema["categorical"]
    timestamp = schema["timestamp"]
    ts_fmt = schema["timestamp_parsing"]["format"]
    tz = schema["timestamp_parsing"]["timezone"]
    missing_token = schema["imputation"]["categorical_missing_token"]
    unseen_token = schema["imputation"]["unseen_token"]

    # Read input batch
    df = _read_csv(input_file)

    # Timestamp preprocessing
    df, ts_numeric_cols = _preprocess_timestamps(df, timestamp, ts_fmt, tz)
    numeric_cols = list(continuous) + list(ts_numeric_cols)

    # Map unseen and missing categories deterministically
    df = _map_unseen_categories(df, categorical, categories_map, missing_token, unseen_token)

    # Align missing numeric columns if absent
    for c in numeric_cols:
        if c not in df.columns:
            df[c] = np.nan
    # Align missing categorical columns if absent
    for c in categorical:
        if c not in df.columns:
            df[c] = missing_token

    # Transform
    X = preprocessor.transform(df)

    # Feature names
    feature_names = metadata.get("feature_names_out", None)
    if feature_names is None:
        # Fallback: generate placeholder names
        feature_names = [f"f{i}" for i in range(X.shape[1])]

    # Save standardized output
    out_df = pd.DataFrame(X, columns=feature_names)
    out_df.to_csv(output_file, index=False)

    print(f"Transform complete. Output saved to: {output_file}")


def main():
    parser = argparse.ArgumentParser(description="Standardize dataset across batches (continuous + categorical + timestamp).")
    subparsers = parser.add_subparsers(dest="mode", required=True)

    p_fit = subparsers.add_parser("fit", help="Fit preprocessing on training batches.")
    p_fit.add_argument("--input_dir", type=str, required=True, help="Directory with training CSV batches.")
    p_fit.add_argument("--schema", type=str, required=True, help="Path to JSON schema.")
    p_fit.add_argument("--output_dir", type=str, required=True, help="Directory to save artifacts.")

    p_transform = subparsers.add_parser("transform", help="Apply saved preprocessing to a new batch.")
    p_transform.add_argument("--input_file", type=str, required=True, help="CSV file to transform.")
    p_transform.add_argument("--artifacts_dir", type=str, required=True, help="Directory with saved artifacts.")
    p_transform.add_argument("--output_file", type=str, required=True, help="Where to write standardized CSV.")

    args = parser.parse_args()

    if args.mode == "fit":
        _fit(args.input_dir, args.schema, args.output_dir)
    elif args.mode == "transform":
        _transform(args.input_file, args.artifacts_dir, args.output_file)
    else:
        raise ValueError("Unsupported mode.")


if __name__ == "__main__":
    main()
```

Notes:
- Unified categorical encoding: categories are collected during fit, sorted deterministically, and augmented with __MISSING__ and __UNSEEN__ tokens to preserve a stable column-space across batches. During transform, any value not in the known set maps to __UNSEEN__.
- Timestamp handling: timestamps are parsed, localized if naive, converted to UTC, and normalized to epoch seconds. These become part of the numeric feature set and are standardized via z-scores.
- Imputation: numeric features use median imputation; categorical features use a constant token for missing values.
- Artifacts: the fitted ColumnTransformer and metadata (schema, categories, feature names) are saved and reused to ensure consistent preprocessing across batches.

适用用户

数据分析师

快速生成统一的标准化脚本,批量处理多份数据,缩短探索与建模准备时间,提升结论可靠性。

商业智能工程师

为报表与仪表盘建立一致的数据口径,自动清洗与编码,减少重复劳动与口径争议,稳定上线节奏。

科研人员

以可复现脚本记录预处理过程,支持多次实验对照,保障论文与项目数据的一致性与可信度。

数据产品经理

迅速评估需求数据可用性,产出标准化方案与说明文档,推动跨部门评审与落地,降低沟通成本。

运营增长团队

将渠道与活动数据一键规范化,快速得到可用指标与特征,加速A/B测试、复盘与投放优化。

教育培训讲师

使用生成脚本作为教学示例,拆解规范化流程与最佳实践,提升学员实操能力与作业质量。

创业者/中小团队

无需深厚技术背景也能获得可运行脚本,快速搭建数据基础,验证产品假设与市场反馈。

数据工程师

将标准化步骤模块化落地,统一团队预处理规范,减少维护成本与返工风险。

解决的问题

帮助数据与业务团队快速把“杂乱原始数据”变成“可直接分析的标准数据”。通过一次精准提问,自动生成贴合你数据特征的Python标准化脚本,覆盖缺失值处理、异常值规整、数值缩放、类别字段统一、时间格式规范等常见清洗环节;同时输出清晰的使用步骤与注意事项。让数据准备时间从数小时缩短到数分钟,提升口径一致性与数据质量,减少对资深工程资源的依赖,加速报表出数与策略验证,最终推动效率提升与更稳的业务决策。

特征总结

一键生成可执行的标准化脚本,按数据特征自动配置处理步骤与清晰注释。
自动识别数值、类别与时间字段,匹配缩放、编码、清洗策略,减少手动试错。
灵活参数化:缺失值填充、异常处理、编码方式可选,一次设定,多场景复用。
按目标场景推荐方案:模型训练、报表、可视化皆可用,兼顾效果与可解释。
生成校验报告与提示,定位问题列与异常分布,上线前就能发现潜在风险。
支持多语言说明与注释,跨团队共享更顺畅,降低沟通成本与交付时间。
自动输出可复现步骤清单,便于审计、版本对比与知识沉淀,确保一致口径。
与下游建模流程衔接顺畅,数据即刻可用,显著压缩准备与迭代周期。
可自定义模板与参数,适配行业规则与数据口径,标准化策略一键切换。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥15.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 249 tokens
- 2 个可调节参数
{ 数据集特性 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59