数据预处理脚本生成

1 浏览
0 试用
0 购买
Sep 27, 2025更新

生成一个Python脚本,用于为机器学习准备数据集。

示例1

以下内容包含两部分:
- 设计说明:预处理目标、关键处理环节与策略
- 可运行的Python脚本:对混合表格数据进行系统化预处理,兼顾缺失值、离群点、类别统一与编码、单位与尺度统一、时间戳特征工程,并输出可直接用于机器学习的特征矩阵与持久化的预处理管道

一、设计说明
- 输入数据特征
  - 混合类型:数值、分类、时间戳
  - 数据质量问题:缺失值、离群点、类别编码不一致、单位与尺度不统一
- 处理策略
  - 列类型识别
    - 数值列:pandas数值类型
    - 分类列:object/category/低基数的字符串;自动规范化后编码
    - 时间戳列:原生datetime列或高占比可解析为日期时间的object列(基于样本解析成功率阈值)
  - 缺失值
    - 数值:中位数填充 + RobustScaler缩放;保留缺失指示列
    - 分类:众数填充;预处理阶段映射“NA/N/A/未知/—/?”等为缺失
    - 时间戳:解析失败视为缺失;保留缺失指示列
  - 离群点
    - 数值列按分位数剪裁(winsorize,默认1%与99%),减少极端值影响
    - 选择RobustScaler以增强对剩余异常的鲁棒性
  - 类别统一与编码
    - 文本规范化:去空格、统一大小写、合并重复空白、统一常见布尔值(是/否、yes/no、true/false、1/0)
    - 罕见类别折叠至“__other__”(基于频率阈值与top-k)
    - OneHotEncoder编码,未知类别忽略
  - 时间戳特征工程
    - 提取年、月、日、星期几、小时、是否周末、与数据集最大日期的间隔天数(可作为“时效性”代理)
    - 原始时间戳列不直接进入模型
  - 单位与尺度统一
    - 通过配置映射按列执行单位换算(乘以转换因子);无配置时不做推断
  - 可复用性与持久化
    - 使用ColumnTransformer + Pipeline构建可复用的预处理器
    - 导出处理后的特征矩阵、列名映射与预处理器对象

二、Python脚本
说明:脚本支持命令行参数;建议准备一个JSON配置文件以定义单位换算与列级同义词映射。若没有配置文件,脚本使用内置的通用默认行为。

"""
预处理脚本文件名示例:preprocess.py
依赖:pandas, numpy, scikit-learn, joblib
可选:scipy(用于保存稀疏矩阵,如未安装则以joblib保存)
"""

import argparse
import json
import re
import sys
from typing import Dict, List, Tuple, Optional

import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder, RobustScaler
from sklearn.model_selection import train_test_split
from joblib import dump

# 可选:若安装了scipy则用于保存稀疏矩阵
try:
    from scipy.sparse import save_npz, issparse
    SCIPY_AVAILABLE = True
except Exception:
    SCIPY_AVAILABLE = False


def load_config(config_path: Optional[str]) -> Dict:
    default_config = {
        "label_column": None,  # 目标列名;不指定则不分离y
        "datetime_columns": [],  # 显式给出时间戳列名;为空时自动推断
        "unit_mapping": {
            # 示例:
            # "amount": {"factor": 0.001, "note": "从毫元(1e-3元)转为元"}
        },
        "category_synonyms": {
            # 可按列定义同义词映射,例如:
            # "gender": {"m": "male", "男": "male", "f": "female", "女": "female"}
        },
        "rare_category": {
            "min_fraction": 0.01,  # 频率低于此阈值折叠
            "top_k": 50            # 保留频率最高的前K类,其他折叠
        },
        "outlier_clip": {
            "method": "quantile",
            "q_low": 0.01,
            "q_high": 0.99
        },
        "random_state": 42,
        "test_size": 0.0  # 若>0则进行train/test拆分
    }
    if config_path is None:
        return default_config
    with open(config_path, "r", encoding="utf-8") as f:
        user_cfg = json.load(f)
    # 递归合并默认配置
    def merge(a, b):
        for k, v in b.items():
            if isinstance(v, dict) and k in a and isinstance(a[k], dict):
                merge(a[k], v)
            else:
                a[k] = v
    merge(default_config, user_cfg)
    return default_config


def is_boolean_like(val: str) -> Optional[str]:
    # 统一常见布尔文本至 "yes"/"no"
    if val is None:
        return None
    s = str(val).strip().lower()
    true_set = {"yes", "y", "true", "t", "1", "是", "對", "对"}
    false_set = {"no", "n", "false", "f", "0", "否", "不"}
    if s in true_set:
        return "yes"
    if s in false_set:
        return "no"
    return None


def normalize_text(s: str) -> Optional[str]:
    if pd.isna(s):
        return np.nan
    t = str(s).strip().lower()
    t = re.sub(r"\s+", " ", t)
    t = t.replace("–", "-").replace("—", "-")
    # 常见缺失标记归为NaN
    missing_tokens = {"", "na", "n/a", "none", "null", "-", "?", "unknown", "未知", "未提供"}
    if t in missing_tokens:
        return np.nan
    # 统一布尔
    bl = is_boolean_like(t)
    if bl is not None:
        return bl
    return t


def apply_category_synonyms(series: pd.Series, mapping: Dict[str, str]) -> pd.Series:
    if not mapping:
        return series
    # 规范化键
    norm_map = {normalize_text(k): v for k, v in mapping.items()}
    return series.map(lambda x: norm_map.get(normalize_text(x), normalize_text(x)))


def collapse_rare_categories(series: pd.Series, min_fraction: float, top_k: int) -> pd.Series:
    s = series.astype("object")
    vc = s.value_counts(dropna=False)
    total = len(s)
    # 保留top_k与频率>=min_fraction的类别
    keep = set(vc.nlargest(top_k).index)
    keep |= set(vc[vc / total >= min_fraction].index)
    return s.map(lambda x: x if x in keep else "__other__")


def infer_datetime_columns(df: pd.DataFrame, explicit_cols: List[str]) -> List[str]:
    dt_cols = set([c for c in explicit_cols if c in df.columns])
    # 自动推断:object列高占比可解析为datetime
    obj_cols = df.select_dtypes(include=["object"]).columns
    for c in obj_cols:
        s = df[c].dropna().astype(str)
        if s.empty:
            continue
        sample = s.sample(min(len(s), 1000), random_state=0)
        parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True, utc=False)
        ok_ratio = parsed.notna().mean()
        if ok_ratio >= 0.8:
            dt_cols.add(c)
    # 原生datetime
    dt_cols |= set(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns)
    return list(dt_cols)


def parse_and_engineer_datetime(df: pd.DataFrame, dt_cols: List[str]) -> pd.DataFrame:
    for c in dt_cols:
        if not np.issubdtype(df[c].dtype, np.datetime64):
            df[c] = pd.to_datetime(df[c], errors="coerce", infer_datetime_format=True)
        # 缺失指示
        df[f"{c}__is_missing"] = df[c].isna().astype(int)
        # 特征
        df[f"{c}__year"] = df[c].dt.year
        df[f"{c}__month"] = df[c].dt.month
        df[f"{c}__day"] = df[c].dt.day
        df[f"{c}__dayofweek"] = df[c].dt.dayofweek
        df[f"{c}__hour"] = df[c].dt.hour
        df[f"{c}__is_weekend"] = df[c].dt.dayofweek.isin([5, 6]).astype(int)
        # 与最大日期的间隔(天)
        max_date = df[c].max()
        if pd.notna(max_date):
            df[f"{c}__age_days"] = (max_date - df[c]).dt.days
        else:
            df[f"{c}__age_days"] = np.nan
        # 原始日期列不进入模型
        df.drop(columns=[c], inplace=True)
    return df


def apply_unit_mapping(df: pd.DataFrame, unit_map: Dict[str, Dict]) -> pd.DataFrame:
    for col, rule in unit_map.items():
        if col in df.columns and "factor" in rule:
            factor = rule["factor"]
            # 仅对数值列进行换算
            if pd.api.types.is_numeric_dtype(df[col]):
                df[col] = df[col].astype(float) * float(factor)
    return df


def clip_outliers(df: pd.DataFrame, num_cols: List[str], method: str = "quantile",
                  q_low: float = 0.01, q_high: float = 0.99) -> pd.DataFrame:
    for c in num_cols:
        if not pd.api.types.is_numeric_dtype(df[c]):
            continue
        s = df[c]
        if method == "quantile":
            lo = s.quantile(q_low)
            hi = s.quantile(q_high)
            df[c] = s.clip(lower=lo, upper=hi)
        else:
            # IQR 方法可选:1.5*IQR
            q1 = s.quantile(0.25)
            q3 = s.quantile(0.75)
            iqr = q3 - q1
            lo = q1 - 1.5 * iqr
            hi = q3 + 1.5 * iqr
            df[c] = s.clip(lower=lo, upper=hi)
    return df


def add_missing_indicators(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
    for c in cols:
        if c in df.columns:
            df[f"{c}__is_missing"] = df[c].isna().astype(int)
    return df


def preprocess_dataframe(df: pd.DataFrame, config: Dict, label_column: Optional[str]) -> Tuple:
    # 单位换算(需在类型识别前执行,避免后续转换影响)
    df = apply_unit_mapping(df, config.get("unit_mapping", {}))

    # 类型识别
    datetime_cols = infer_datetime_columns(df, config.get("datetime_columns", []))
    df = parse_and_engineer_datetime(df, datetime_cols)

    # 分类列规范化(仅针对object/category)
    cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
    for c in cat_cols:
        # 文本标准化
        df[c] = df[c].map(normalize_text)
        # 列级同义词映射
        syn_map = config.get("category_synonyms", {}).get(c, {})
        df[c] = apply_category_synonyms(df[c], syn_map)
        # 罕见类别折叠
        df[c] = collapse_rare_categories(
            df[c],
            config["rare_category"]["min_fraction"],
            config["rare_category"]["top_k"]
        )

    # 布尔列转数值
    bool_cols = df.select_dtypes(include=["bool"]).columns.tolist()
    for c in bool_cols:
        df[c] = df[c].astype(int)

    # 数值列处理:缺失指示 + 离群剪裁
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    # 不包含标签列
    if label_column and label_column in num_cols:
        num_cols.remove(label_column)

    df = add_missing_indicators(df, num_cols)
    df = clip_outliers(
        df, num_cols,
        method=config["outlier_clip"]["method"],
        q_low=config["outlier_clip"]["q_low"],
        q_high=config["outlier_clip"]["q_high"]
    )

    # 重新识别分类与数值列(增加的指示列均为数值)
    cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
    num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
    if label_column and label_column in cat_cols:
        cat_cols.remove(label_column)
    if label_column and label_column in num_cols:
        num_cols.remove(label_column)

    # 分离标签
    y = None
    if label_column and label_column in df.columns:
        y = df[label_column]
        X_df = df.drop(columns=[label_column])
    else:
        X_df = df

    # 构建预处理管道
    numeric_pipeline = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="median")),
        ("scaler", RobustScaler())
    ])

    categorical_pipeline = Pipeline(steps=[
        ("imputer", SimpleImputer(strategy="most_frequent")),
        ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=True))
    ])

    preprocessor = ColumnTransformer(transformers=[
        ("num", numeric_pipeline, num_cols),
        ("cat", categorical_pipeline, cat_cols)
    ])

    return X_df, y, preprocessor, num_cols, cat_cols


def main():
    parser = argparse.ArgumentParser(description="混合表格数据预处理脚本(缺失值、离群点、类别统一与编码、单位与尺度统一、时间戳特征工程)")
    parser.add_argument("--input", required=True, help="输入数据文件路径(CSV/Parquet)")
    parser.add_argument("--output_dir", required=True, help="输出目录")
    parser.add_argument("--config", default=None, help="配置文件路径(JSON)")
    parser.add_argument("--label", default=None, help="标签列名(若配置文件中已指定可省略)")
    parser.add_argument("--sep", default=",", help="CSV分隔符(默认逗号)")
    args = parser.parse_args()

    config = load_config(args.config)
    if args.label is not None:
        config["label_column"] = args.label

    # 读取数据
    if args.input.lower().endswith(".parquet"):
        df = pd.read_parquet(args.input)
    else:
        df = pd.read_csv(args.input, sep=args.sep, low_memory=False)

    # 预处理
    X_df, y, preprocessor, num_cols, cat_cols = preprocess_dataframe(df, config, config.get("label_column"))

    # 拆分(可选)
    test_size = float(config.get("test_size", 0.0))
    random_state = int(config.get("random_state", 42))

    if test_size and test_size > 0:
        if y is None:
            print("警告:未指定标签列,test_size>0仅拆分特征。", file=sys.stderr)
        X_train_df, X_test_df, y_train, y_test = train_test_split(
            X_df, y, test_size=test_size, random_state=random_state, stratify=y if y is not None else None
        )
        # 拟合与变换
        X_train = preprocessor.fit_transform(X_train_df)
        X_test = preprocessor.transform(X_test_df)
        # 输出保存
        import os
        os.makedirs(args.output_dir, exist_ok=True)

        # 保存稀疏或稠密特征
        if SCIPY_AVAILABLE and issparse(X_train):
            save_npz(f"{args.output_dir}/X_train.npz", X_train)
            save_npz(f"{args.output_dir}/X_test.npz", X_test)
        else:
            dump(X_train, f"{args.output_dir}/X_train.pkl")
            dump(X_test, f"{args.output_dir}/X_test.pkl")

        if y is not None:
            y_train.to_pickle(f"{args.output_dir}/y_train.pkl")
            y_test.to_pickle(f"{args.output_dir}/y_test.pkl")

        # 保存预处理器与列信息
        dump(preprocessor, f"{args.output_dir}/preprocessor.joblib")
        meta = {
            "numeric_columns": num_cols,
            "categorical_columns": cat_cols,
            "datetime_engineered": True,
            "config": config
        }
        with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

        print("预处理完成并拆分训练/测试集。")
    else:
        # 全量拟合与变换
        X = preprocessor.fit_transform(X_df)
        import os
        os.makedirs(args.output_dir, exist_ok=True)

        if SCIPY_AVAILABLE and issparse(X):
            save_npz(f"{args.output_dir}/X.npz", X)
        else:
            dump(X, f"{args.output_dir}/X.pkl")

        if y is not None:
            y.to_pickle(f"{args.output_dir}/y.pkl")

        dump(preprocessor, f"{args.output_dir}/preprocessor.joblib")
        meta = {
            "numeric_columns": num_cols,
            "categorical_columns": cat_cols,
            "datetime_engineered": True,
            "config": config
        }
        with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f:
            json.dump(meta, f, ensure_ascii=False, indent=2)

        print("预处理完成。")


if __name__ == "__main__":
    main()

使用说明与建议
- 运行示例
  - 无配置文件(仅默认行为):python preprocess.py --input data.csv --output_dir out --label target
  - 使用配置文件:python preprocess.py --input data.csv --output_dir out --config config.json
- 配置文件示例(JSON)
  {
    "label_column": "target",
    "datetime_columns": ["event_time"],
    "unit_mapping": {
      "amount": {"factor": 0.001, "note": "从毫元换算到元"},
      "length_mm": {"factor": 0.1, "note": "从毫米到厘米"}
    },
    "category_synonyms": {
      "gender": {"m": "male", "男": "male", "f": "female", "女": "female"}
    },
    "rare_category": {"min_fraction": 0.01, "top_k": 50},
    "outlier_clip": {"method": "quantile", "q_low": 0.01, "q_high": 0.99},
    "random_state": 42,
    "test_size": 0.2
  }
- 输出内容
  - 特征矩阵:out/X.npz(或X.pkl),稀疏矩阵优先
  - 标签:out/y.pkl(若指定label_column)
  - 预处理器:out/preprocessor.joblib(用于在训练或推理阶段对新数据执行同样的变换)
  - 元信息:out/meta.json(记录列名与配置)
- 注意事项
  - 单位换算依赖明确的列级配置;脚本不对单位进行自动推断,避免错误转换
  - 高基数分类变量在OneHot后可能导致维度较大;脚本通过“罕见类别折叠”为“__other__”控制维度
  - 若拟采用线性模型,对极不平衡的类别或长尾分布建议结合降维或正则化策略
  - 时间差特征“age_days”使用该列在数据中的最大日期作为参考;如有业务基准时间(例如当前时间或某一基准日),可在配置层面加以扩展替换
  - 由于Python原生hash存在随机盐不稳定性,脚本未使用哈希编码;若需进一步缩减高基数维度,可增加哈希桶编码的自定义Transformer(稳定哈希如md5)并在ColumnTransformer中替换OneHot分支

该脚本旨在提供稳健、可复用的预处理基线,兼顾数据质量问题与混合类型特征的工程化,以便后续机器学习建模直接使用。

示例2

以下脚本面向“多源报表 CSV 与应用日志”的统一预处理,解决字段命名不统一、日期格式混杂、空值与重复记录等问题,并为后续建模、指标分析与可视化输出干净一致的数据。脚本具备以下能力:
- 自动发现并加载目录中的多份 CSV 报表与多种格式的应用日志(JSON 行、key=value、常见时间戳开头的文本日志)。
- 统一字段命名与数据类型,标准化日期时间(含混合格式、时区),规范缺失值与字符串。
- 按业务主键去重,保留最新记录。
- 输出清洗后的标准数据(Parquet/CSV)与基础指标聚合结果(按天/来源/事件)。
- 可配置字段映射、类型与缺失值策略。

使用说明简述:
- 依赖:pandas, numpy, python-dateutil
- 可选:pyarrow(写 Parquet 更快)
- 运行示例:
  python preprocess_pipeline.py --report-dir ./reports --log-dir ./logs --output-dir ./output --write-parquet

Python 脚本 preprocess_pipeline.py:

```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
多源 CSV 报表与应用日志预处理脚本
- 统一字段:依据可配置映射,将来源异构字段对齐到统一 schema
- 解析日期:支持混合格式与时区,统一到 UTC
- 处理缺失:标准化空值、按阈值丢弃列/行、可选简单填充
- 去重:按业务主键或近似主键策略去重
- 合并:多 CSV 与日志合并到统一事件级明细
- 导出:清洗明细 + 指标聚合
"""

import os
import re
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from dateutil import parser as du_parser
from datetime import timezone

# ----------------------------
# 配置:统一字段名、类型与映射
# 可根据实际数据源扩展
# ----------------------------
CANONICAL_COLUMNS = [
    "event_time", "event_date", "event_name", "user_id", "session_id", "source",
    "amount", "currency", "app_version", "device", "country", "id", "created_at"
]

# 字段同义词映射(lower 后匹配)
COLUMN_SYNONYMS = {
    "event_time": ["event_time", "time", "timestamp", "ts", "event_ts", "log_time"],
    "event_name": ["event_name", "event", "action", "eventtype", "name"],
    "user_id": ["user_id", "uid", "user", "userid", "member_id", "account_id"],
    "session_id": ["session_id", "sid", "session"],
    "source": ["source", "src", "channel", "platform", "origin"],
    "amount": ["amount", "revenue", "value", "price", "amt"],
    "currency": ["currency", "curr", "ccy"],
    "app_version": ["app_version", "appver", "version"],
    "device": ["device", "device_type", "ua_device"],
    "country": ["country", "country_code", "ctry"],
    "id": ["id", "event_id", "row_id", "uuid"],
    "created_at": ["created_at", "ingest_time", "createdtime", "created_ts", "server_time"]
}

# 推荐数据类型
TARGET_DTYPES = {
    "event_time": "datetime64[ns, UTC]",   # pandas >= 1.1 支持 tz-aware dtype
    "event_date": "date",                  # 后续由 event_time 派生(日期粒度)
    "event_name": "string",
    "user_id": "string",
    "session_id": "string",
    "source": "string",
    "amount": "float64",
    "currency": "string",
    "app_version": "string",
    "device": "string",
    "country": "string",
    "id": "string",
    "created_at": "datetime64[ns, UTC]"
}

# 识别为缺失的字符串
NA_VALUES = ["", "na", "n/a", "null", "none", "-", "nan", "NaN", "NAN", "Nil"]

# 日期格式尝试集合(尽量覆盖常见格式)
DATETIME_FORMATS = [
    "%Y-%m-%d %H:%M:%S%z",
    "%Y-%m-%d %H:%M:%S",
    "%Y/%m/%d %H:%M:%S",
    "%d/%m/%Y %H:%M:%S",
    "%m/%d/%Y %H:%M:%S",
    "%Y-%m-%d",
    "%d/%m/%Y",
    "%m/%d/%Y",
    "%Y-%m-%dT%H:%M:%S%z",
    "%Y-%m-%dT%H:%M:%S.%f%z",
    "%Y-%m-%dT%H:%M:%S.%fZ",
    "%Y-%m-%dT%H:%M:%SZ",
]

# 日志时间戳识别(示例:2024-01-01 12:00:00Z / 2024-01-01T12:00:00+08:00 / Jan 02 03:04:05)
LOG_TS_REGEXES = [
    re.compile(r"^\s*(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[.,]\d+)?(?:Z|[+-]\d{2}:\d{2})?)\s+"),
    re.compile(r"^\s*(?P<ts>[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"),  # Syslog: Mon DD HH:MM:SS
]


# ----------------------------
# 工具函数
# ----------------------------
def normalize_colname(name: str) -> str:
    return re.sub(r"[^a-z0-9_]", "", name.strip().lower())

def infer_and_map_columns(cols: List[str], synonyms: Dict[str, List[str]]) -> Dict[str, str]:
    """
    根据同义词映射,自动将来源列名映射到规范列名。
    返回:source_col -> canonical_col
    """
    norm_cols = {normalize_colname(c): c for c in cols}
    mapping = {}
    for canonical, syns in synonyms.items():
        syns_norm = [normalize_colname(s) for s in syns]
        for syn in syns_norm:
            if syn in norm_cols:
                mapping[norm_cols[syn]] = canonical
                break
    return mapping

def parse_mixed_datetime(s: str) -> Optional[pd.Timestamp]:
    """
    解析混合日期时间字符串,返回 UTC tz-aware Timestamp;失败返回 None
    """
    if s is None:
        return None
    if isinstance(s, (pd.Timestamp, np.datetime64)):
        try:
            ts = pd.to_datetime(s, utc=True, errors="coerce")
            return ts if pd.notna(ts) else None
        except Exception:
            return None
    if not isinstance(s, str):
        try:
            ts = pd.to_datetime(s, utc=True, errors="coerce")
            return ts if pd.notna(ts) else None
        except Exception:
            return None
    s_clean = s.strip()
    if s_clean == "":
        return None

    # 先用指定格式尝试
    for fmt in DATETIME_FORMATS:
        try:
            dt = pd.to_datetime(s_clean, format=fmt, utc=True)
            return dt
        except Exception:
            pass
    # 再用 dateutil
    try:
        dt = du_parser.parse(s_clean)
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)  # 无时区视为 UTC
        return pd.Timestamp(dt).tz_convert("UTC")
    except Exception:
        return None

def parse_datetime_series(sr: pd.Series) -> pd.Series:
    return sr.apply(parse_mixed_datetime)

def standardize_strings(sr: pd.Series, lower: bool = True, strip: bool = True) -> pd.Series:
    if sr.dtype == "O" or pd.api.types.is_string_dtype(sr):
        sr = sr.astype("string")
        if strip:
            sr = sr.str.strip()
        if lower:
            sr = sr.str.lower()
        # 将空字符串标准化为 <NA>
        sr = sr.replace("", pd.NA)
    return sr

def to_float_series(sr: pd.Series) -> pd.Series:
    # 清理千分位、货币符号等,转 float
    if sr.dtype == "O" or pd.api.types.is_string_dtype(sr):
        sr = sr.str.replace(r"[,\s]", "", regex=True)
        sr = sr.str.replace(r"[^\d\.\-eE]", "", regex=True)
    return pd.to_numeric(sr, errors="coerce").astype("float64")

def derive_event_date(df: pd.DataFrame) -> pd.DataFrame:
    if "event_time" in df.columns:
        df["event_date"] = df["event_time"].dt.date
    return df

def drop_sparse_columns(df: pd.DataFrame, missing_ratio_threshold: float = 0.98) -> pd.DataFrame:
    missing_ratio = df.isna().mean()
    keep_cols = missing_ratio[missing_ratio < missing_ratio_threshold].index.tolist()
    return df[keep_cols]

def deduplicate(df: pd.DataFrame) -> pd.DataFrame:
    """
    优先级:
    1) 若存在 'id',按 id 去重
    2) 否则按 ['user_id','event_time','event_name','source'] 去重
    若存在 created_at,则保留最近(最大)的一条
    """
    subset = None
    if "id" in df.columns and df["id"].notna().any():
        subset = ["id"]
    else:
        cand = [c for c in ["user_id", "event_time", "event_name", "source"] if c in df.columns]
        subset = cand if len(cand) >= 2 else df.columns.tolist()  # 退化为整行去重

    if "created_at" in df.columns:
        df = df.sort_values("created_at", ascending=True)  # 保留最新:后续 drop_duplicates(keep='last')
        df = df.drop_duplicates(subset=subset, keep="last")
    else:
        df = df.drop_duplicates(subset=subset, keep="first")
    return df

def cast_to_target_dtypes(df: pd.DataFrame) -> pd.DataFrame:
    # 字符串标准化
    for col in ["event_name", "user_id", "session_id", "source", "currency", "app_version", "device", "country", "id"]:
        if col in df.columns:
            df[col] = standardize_strings(df[col], lower=True)
    # 金额
    if "amount" in df.columns:
        df["amount"] = to_float_series(df["amount"])
    # 日期时间
    for col in ["event_time", "created_at"]:
        if col in df.columns:
            df[col] = parse_datetime_series(df[col])
            # 显式设为 UTC tz-aware
            df[col] = df[col].dt.tz_convert("UTC")
    # 派生 event_date
    df = derive_event_date(df)
    return df

def read_csv_reports(report_dir: Path, na_values: List[str]) -> List[pd.DataFrame]:
    dfs = []
    for p in report_dir.rglob("*.csv"):
        try:
            df = pd.read_csv(
                p,
                dtype=str,
                na_values=na_values,
                keep_default_na=True,
                encoding="utf-8",
                on_bad_lines="skip"
            )
            if df.empty:
                continue
            # 原字段标准化与映射
            df.columns = [c.strip() for c in df.columns]
            col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS)
            df = df.rename(columns=col_map)
            df["source"] = df.get("source", pd.Series(["report"] * len(df), index=df.index))
            df["source"] = df["source"].fillna("report")
            df["__file"] = str(p)
            dfs.append(df)
        except Exception as e:
            print(f"[WARN] 读取 CSV 失败: {p} -> {e}")
    return dfs

def extract_kv_pairs(text: str) -> Dict[str, str]:
    """
    从文本中提取 key=value 对,支持 key="val with space"
    """
    pairs = {}
    for k, v in re.findall(r'(\b[a-zA-Z_][\w\-]*?)=(".*?"|\'.*?\'|\S+)', text):
        if v.startswith(("\"", "'")) and v.endswith(("\"", "'")):
            v = v[1:-1]
        pairs[k] = v
    return pairs

def parse_log_line(line: str) -> Optional[Dict]:
    # 优先尝试 JSON
    line = line.strip()
    if not line:
        return None
    try:
        obj = json.loads(line)
        if isinstance(obj, dict):
            return obj
    except Exception:
        pass

    # 尝试从常见日志格式提取时间戳
    ts_val = None
    for rgx in LOG_TS_REGEXES:
        m = rgx.match(line)
        if m:
            ts_val = m.group("ts")
            # 去掉时间戳前缀,保留剩余文本以便提取 kv
            line = line[m.end():]
            break

    kv = extract_kv_pairs(line)
    if ts_val and "timestamp" not in kv and "time" not in kv:
        kv["timestamp"] = ts_val
    return kv if kv else None

def read_logs(log_dir: Path) -> List[pd.DataFrame]:
    rows = []
    for p in list(log_dir.rglob("*.log")) + list(log_dir.rglob("*.jsonl")) + list(log_dir.rglob("*.txt")):
        try:
            with open(p, "r", encoding="utf-8", errors="ignore") as f:
                for line in f:
                    obj = parse_log_line(line)
                    if not obj:
                        continue
                    obj["__file"] = str(p)
                    rows.append(obj)
        except Exception as e:
            print(f"[WARN] 读取日志失败: {p} -> {e}")
    if not rows:
        return []
    df = pd.DataFrame(rows)
    if df.empty:
        return []
    # 映射列名
    df.columns = [c.strip() for c in df.columns]
    col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS)
    df = df.rename(columns=col_map)
    # 若日志中只有 timestamp,没有 event_time,进行补齐
    if "event_time" not in df.columns and "timestamp" in df.columns:
        df = df.rename(columns={"timestamp": "event_time"})
    # 若没有 source,则标记为 log
    if "source" not in df.columns:
        df["source"] = "log"
    return [df]

def harmonize_schema(dfs: List[pd.DataFrame]) -> pd.DataFrame:
    if not dfs:
        return pd.DataFrame(columns=CANONICAL_COLUMNS)
    # 统一列集合
    all_cols = set()
    for df in dfs:
        all_cols |= set(df.columns)
    all_cols |= set(CANONICAL_COLUMNS)
    all_cols = list(all_cols)
    # 对齐并合并
    aligned = []
    for df in dfs:
        missing = [c for c in all_cols if c not in df.columns]
        for c in missing:
            df[c] = pd.NA
        aligned.append(df[all_cols])
    merged = pd.concat(aligned, ignore_index=True)
    return merged

def clean_and_standardize(df: pd.DataFrame, drop_sparse_threshold: float, fill_amount_zero: bool) -> pd.DataFrame:
    # 标准化字符串空值
    for c in df.columns:
        if df[c].dtype == "O" or pd.api.types.is_string_dtype(df[c]):
            df[c] = df[c].replace(NA_VALUES, pd.NA)

    df = cast_to_target_dtypes(df)

    # 可选:金额缺失填 0
    if fill_amount_zero and "amount" in df.columns:
        df["amount"] = df["amount"].fillna(0.0)

    # 丢弃极度稀疏列
    df = drop_sparse_columns(df, missing_ratio_threshold=drop_sparse_threshold)

    # 关键字段最小约束:必须有 event_time
    if "event_time" in df.columns:
        df = df[~df["event_time"].isna()]

    # 去重
    df = deduplicate(df)

    # 排序
    sort_cols = [c for c in ["event_time", "created_at"] if c in df.columns]
    if sort_cols:
        df = df.sort_values(sort_cols).reset_index(drop=True)

    return df

def aggregate_metrics(df: pd.DataFrame) -> pd.DataFrame:
    """
    基础指标:按天/来源/事件
    - 事件量
    - 去重用户数
    - 金额汇总
    """
    needed = ["event_date", "source", "event_name", "user_id", "amount"]
    for c in needed:
        if c not in df.columns:
            df[c] = pd.NA
    grp = df.groupby(["event_date", "source", "event_name"], dropna=False)
    out = grp.agg(
        events=("event_name", "count"),
        users=("user_id", pd.Series.nunique),
        amount_sum=("amount", "sum")
    ).reset_index()
    # 处理空值显示
    out["amount_sum"] = out["amount_sum"].fillna(0.0)
    return out

def save_outputs(df_clean: pd.DataFrame, metrics: pd.DataFrame, output_dir: Path, write_parquet: bool) -> Tuple[Path, Path]:
    output_dir.mkdir(parents=True, exist_ok=True)
    clean_path = output_dir / ("clean_events.parquet" if write_parquet else "clean_events.csv")
    metrics_path = output_dir / "metrics_daily.csv"
    if write_parquet:
        try:
            df_clean.to_parquet(clean_path, index=False)
        except Exception as e:
            print(f"[WARN] 写入 Parquet 失败,回退为 CSV:{e}")
            clean_path = output_dir / "clean_events.csv"
            df_clean.to_csv(clean_path, index=False)
    else:
        df_clean.to_csv(clean_path, index=False)
    metrics.to_csv(metrics_path, index=False)
    return clean_path, metrics_path

def report_stats(raw_dfs_count: int, df_clean: pd.DataFrame):
    print("========== 处理统计 ==========")
    print(f"合并数据源数量: {raw_dfs_count}")
    print(f"清洗后记录数: {len(df_clean)}")
    if "user_id" in df_clean.columns:
        print(f"去重用户数: {df_clean['user_id'].nunique(dropna=True)}")
    if "event_date" in df_clean.columns:
        print(f"日期范围: {df_clean['event_date'].min()} ~ {df_clean['event_date'].max()}")
    print("=============================")

def main():
    parser = argparse.ArgumentParser(description="多源 CSV + 日志预处理管道")
    parser.add_argument("--report-dir", type=str, required=False, help="CSV 报表目录")
    parser.add_argument("--log-dir", type=str, required=False, help="日志目录(.log/.jsonl/.txt)")
    parser.add_argument("--output-dir", type=str, required=True, help="输出目录")
    parser.add_argument("--drop-sparse-threshold", type=float, default=0.98, help="丢弃稀疏列阈值(缺失率>=阈值将被丢弃)")
    parser.add_argument("--fill-amount-zero", action="store_true", help="将金额缺失填充为 0")
    parser.add_argument("--write-parquet", action="store_true", help="输出 Parquet(需安装 pyarrow 或 fastparquet)")

    args = parser.parse_args()
    report_dir = Path(args.report_dir) if args.report_dir else None
    log_dir = Path(args.log_dir) if args.log_dir else None
    output_dir = Path(args.output_dir)

    dfs: List[pd.DataFrame] = []

    if report_dir and report_dir.exists():
        dfs += read_csv_reports(report_dir, NA_VALUES)
    if log_dir and log_dir.exists():
        dfs += read_logs(log_dir)

    if not dfs:
        print("[ERROR] 未在指定目录中发现可用数据源。")
        return

    df_all = harmonize_schema(dfs)
    df_clean = clean_and_standardize(
        df_all,
        drop_sparse_threshold=args.drop_sparse_threshold,
        fill_amount_zero=args.fill_amount_zero
    )
    metrics = aggregate_metrics(df_clean)

    save_outputs(df_clean, metrics, output_dir, write_parquet=args.write_parquet)
    report_stats(len(dfs), df_clean)
    print(f"输出目录: {output_dir.resolve()}")

if __name__ == "__main__":
    pd.options.mode.chained_assignment = None  # 降低不必要的警告噪声
    main()
```

关键实现细节与策略说明:
- 字段对齐:通过 COLUMN_SYNONYMS 自动将来源字段映射到统一 schema,规避“命名不统一”问题。未识别到的字段保留原列名,便于后续扩展映射。
- 日期时间解析:parse_mixed_datetime 优先尝试常见格式,再回退到 dateutil 解析,并统一转换为 UTC。无时区视为 UTC,避免偏移误差。
- 缺失值与字符串标准化:将多种“空值表示”统一为缺失;对事件名、来源、用户 ID 等进行去空格、统一小写处理,利于分组与去重。
- 金额字段:去除千分位与符号后转 float;可选填充缺失为 0(--fill-amount-zero)。
- 去重策略:优先使用唯一 id;否则退化到 ['user_id','event_time','event_name','source'] 组合主键;如有 created_at,保留最新记录。
- 日志解析:优先 JSON 行;否则用正则提取时间戳与 key=value 对,最大化结构化信息保留。
- 指标聚合:输出每日/来源/事件的事件量、去重用户数与金额汇总,便于可视化与监控。
- 可扩展性:COLUMN_SYNONYMS 与 DATETIME_FORMATS 可根据实际数据源更新;如需复杂数据类型与业务校验,可在 cast_to_target_dtypes 与 clean_and_standardize 中扩展。

如需进一步集成特定业务规则(例如事件白名单、非法事件过滤、国家与货币映射、异常值截断/Winsorization),可在清洗阶段插入对应逻辑,以保证下游分析与建模的稳定性与一致性。

适用用户

数据科学家

快速搭建稳健的数据预处理管线;统一编码与标准化;提升模型效果并缩短迭代周期。

机器学习工程师

将原始数据一键转为可训练数据;自动划分数据集与固定随机种子;便于上线、复盘与复现。

业务分析师/BI

整合多源报表与日志;清洗、去重、补全缺失;输出规范化数据表用于指标分析与可视化。

风控与运营团队

按业务规则定制特征与标签;分箱、异常标记、频次统计;快速验证策略并加速落地。

研究人员与高校教师

用于实验与教学;生成清晰脚本与说明;帮助学生理解流程并保证实验重现性。

初创团队与产品经理

在资源有限时快速形成数据处理方案;缩短原型到验证周期;降低外包与沟通成本。

解决的问题

让AI充当资深数据挖掘顾问,按你的数据集特征快速生成可运行的Python数据预处理脚本,覆盖缺失值处理、重复与异常清理、字段类型统一、类别编码、数值缩放、时间与文本特征提取、训练/验证/测试拆分等关键环节;以清晰、标准、可复用的流程,帮助团队在几分钟内把“原始数据”转变为“模型可用数据”,显著缩短建模周期、提升模型表现、减少人为失误,并支持中英文输出以便协作与交付。

特征总结

一键生成可执行预处理脚本,依据数据特性自动选择清洗、编码与标准化策略。
智能识别缺失与异常值,自动填补、剔除或变换,显著提升模型训练稳定性。
轻松完成数值与类别特征工程,支持分箱、独热、目标编码与文本向量化。
自动划分训练、验证、测试集并固定随机种子,确保结果可复现与可对比。
生成结构清晰的脚本与操作说明,便于团队协作与交付,减少沟通与改写成本。
支持按业务场景定制参数与规则,营销、风控、客服数据可快速落地应用。
内置数据质量检查与简要报告输出,直观呈现分布、缺陷与优化建议。
适配大规模数据处理,支持分步执行与批量运行,降低资源占用与等待时间。
多语言输出与专业写作风格,便于跨部门共享与审核,减少二次整理时间。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥15.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 255 tokens
- 2 个可调节参数
{ 数据集特点 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59