生成一个Python脚本,用于为机器学习准备数据集。
以下内容包含两部分: - 设计说明:预处理目标、关键处理环节与策略 - 可运行的Python脚本:对混合表格数据进行系统化预处理,兼顾缺失值、离群点、类别统一与编码、单位与尺度统一、时间戳特征工程,并输出可直接用于机器学习的特征矩阵与持久化的预处理管道 一、设计说明 - 输入数据特征 - 混合类型:数值、分类、时间戳 - 数据质量问题:缺失值、离群点、类别编码不一致、单位与尺度不统一 - 处理策略 - 列类型识别 - 数值列:pandas数值类型 - 分类列:object/category/低基数的字符串;自动规范化后编码 - 时间戳列:原生datetime列或高占比可解析为日期时间的object列(基于样本解析成功率阈值) - 缺失值 - 数值:中位数填充 + RobustScaler缩放;保留缺失指示列 - 分类:众数填充;预处理阶段映射“NA/N/A/未知/—/?”等为缺失 - 时间戳:解析失败视为缺失;保留缺失指示列 - 离群点 - 数值列按分位数剪裁(winsorize,默认1%与99%),减少极端值影响 - 选择RobustScaler以增强对剩余异常的鲁棒性 - 类别统一与编码 - 文本规范化:去空格、统一大小写、合并重复空白、统一常见布尔值(是/否、yes/no、true/false、1/0) - 罕见类别折叠至“__other__”(基于频率阈值与top-k) - OneHotEncoder编码,未知类别忽略 - 时间戳特征工程 - 提取年、月、日、星期几、小时、是否周末、与数据集最大日期的间隔天数(可作为“时效性”代理) - 原始时间戳列不直接进入模型 - 单位与尺度统一 - 通过配置映射按列执行单位换算(乘以转换因子);无配置时不做推断 - 可复用性与持久化 - 使用ColumnTransformer + Pipeline构建可复用的预处理器 - 导出处理后的特征矩阵、列名映射与预处理器对象 二、Python脚本 说明:脚本支持命令行参数;建议准备一个JSON配置文件以定义单位换算与列级同义词映射。若没有配置文件,脚本使用内置的通用默认行为。 """ 预处理脚本文件名示例:preprocess.py 依赖:pandas, numpy, scikit-learn, joblib 可选:scipy(用于保存稀疏矩阵,如未安装则以joblib保存) """ import argparse import json import re import sys from typing import Dict, List, Tuple, Optional import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, RobustScaler from sklearn.model_selection import train_test_split from joblib import dump # 可选:若安装了scipy则用于保存稀疏矩阵 try: from scipy.sparse import save_npz, issparse SCIPY_AVAILABLE = True except Exception: SCIPY_AVAILABLE = False def load_config(config_path: Optional[str]) -> Dict: default_config = { "label_column": None, # 目标列名;不指定则不分离y "datetime_columns": [], # 显式给出时间戳列名;为空时自动推断 "unit_mapping": { # 示例: # "amount": {"factor": 0.001, "note": "从毫元(1e-3元)转为元"} }, "category_synonyms": { # 可按列定义同义词映射,例如: # "gender": {"m": "male", "男": "male", "f": "female", "女": "female"} }, "rare_category": { "min_fraction": 0.01, # 频率低于此阈值折叠 "top_k": 50 # 保留频率最高的前K类,其他折叠 }, "outlier_clip": { "method": "quantile", "q_low": 0.01, "q_high": 0.99 }, "random_state": 42, "test_size": 0.0 # 若>0则进行train/test拆分 } if config_path is None: return default_config with open(config_path, "r", encoding="utf-8") as f: user_cfg = json.load(f) # 递归合并默认配置 def merge(a, b): for k, v in b.items(): if isinstance(v, dict) and k in a and isinstance(a[k], dict): merge(a[k], v) else: a[k] = v merge(default_config, user_cfg) return default_config def is_boolean_like(val: str) -> Optional[str]: # 统一常见布尔文本至 "yes"/"no" if val is None: return None s = str(val).strip().lower() true_set = {"yes", "y", "true", "t", "1", "是", "對", "对"} false_set = {"no", "n", "false", "f", "0", "否", "不"} if s in true_set: return "yes" if s in false_set: return "no" return None def normalize_text(s: str) -> Optional[str]: if pd.isna(s): return np.nan t = str(s).strip().lower() t = re.sub(r"\s+", " ", t) t = t.replace("–", "-").replace("—", "-") # 常见缺失标记归为NaN missing_tokens = {"", "na", "n/a", "none", "null", "-", "?", "unknown", "未知", "未提供"} if t in missing_tokens: return np.nan # 统一布尔 bl = is_boolean_like(t) if bl is not None: return bl return t def apply_category_synonyms(series: pd.Series, mapping: Dict[str, str]) -> pd.Series: if not mapping: return series # 规范化键 norm_map = {normalize_text(k): v for k, v in mapping.items()} return series.map(lambda x: norm_map.get(normalize_text(x), normalize_text(x))) def collapse_rare_categories(series: pd.Series, min_fraction: float, top_k: int) -> pd.Series: s = series.astype("object") vc = s.value_counts(dropna=False) total = len(s) # 保留top_k与频率>=min_fraction的类别 keep = set(vc.nlargest(top_k).index) keep |= set(vc[vc / total >= min_fraction].index) return s.map(lambda x: x if x in keep else "__other__") def infer_datetime_columns(df: pd.DataFrame, explicit_cols: List[str]) -> List[str]: dt_cols = set([c for c in explicit_cols if c in df.columns]) # 自动推断:object列高占比可解析为datetime obj_cols = df.select_dtypes(include=["object"]).columns for c in obj_cols: s = df[c].dropna().astype(str) if s.empty: continue sample = s.sample(min(len(s), 1000), random_state=0) parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True, utc=False) ok_ratio = parsed.notna().mean() if ok_ratio >= 0.8: dt_cols.add(c) # 原生datetime dt_cols |= set(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) return list(dt_cols) def parse_and_engineer_datetime(df: pd.DataFrame, dt_cols: List[str]) -> pd.DataFrame: for c in dt_cols: if not np.issubdtype(df[c].dtype, np.datetime64): df[c] = pd.to_datetime(df[c], errors="coerce", infer_datetime_format=True) # 缺失指示 df[f"{c}__is_missing"] = df[c].isna().astype(int) # 特征 df[f"{c}__year"] = df[c].dt.year df[f"{c}__month"] = df[c].dt.month df[f"{c}__day"] = df[c].dt.day df[f"{c}__dayofweek"] = df[c].dt.dayofweek df[f"{c}__hour"] = df[c].dt.hour df[f"{c}__is_weekend"] = df[c].dt.dayofweek.isin([5, 6]).astype(int) # 与最大日期的间隔(天) max_date = df[c].max() if pd.notna(max_date): df[f"{c}__age_days"] = (max_date - df[c]).dt.days else: df[f"{c}__age_days"] = np.nan # 原始日期列不进入模型 df.drop(columns=[c], inplace=True) return df def apply_unit_mapping(df: pd.DataFrame, unit_map: Dict[str, Dict]) -> pd.DataFrame: for col, rule in unit_map.items(): if col in df.columns and "factor" in rule: factor = rule["factor"] # 仅对数值列进行换算 if pd.api.types.is_numeric_dtype(df[col]): df[col] = df[col].astype(float) * float(factor) return df def clip_outliers(df: pd.DataFrame, num_cols: List[str], method: str = "quantile", q_low: float = 0.01, q_high: float = 0.99) -> pd.DataFrame: for c in num_cols: if not pd.api.types.is_numeric_dtype(df[c]): continue s = df[c] if method == "quantile": lo = s.quantile(q_low) hi = s.quantile(q_high) df[c] = s.clip(lower=lo, upper=hi) else: # IQR 方法可选:1.5*IQR q1 = s.quantile(0.25) q3 = s.quantile(0.75) iqr = q3 - q1 lo = q1 - 1.5 * iqr hi = q3 + 1.5 * iqr df[c] = s.clip(lower=lo, upper=hi) return df def add_missing_indicators(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame: for c in cols: if c in df.columns: df[f"{c}__is_missing"] = df[c].isna().astype(int) return df def preprocess_dataframe(df: pd.DataFrame, config: Dict, label_column: Optional[str]) -> Tuple: # 单位换算(需在类型识别前执行,避免后续转换影响) df = apply_unit_mapping(df, config.get("unit_mapping", {})) # 类型识别 datetime_cols = infer_datetime_columns(df, config.get("datetime_columns", [])) df = parse_and_engineer_datetime(df, datetime_cols) # 分类列规范化(仅针对object/category) cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist() for c in cat_cols: # 文本标准化 df[c] = df[c].map(normalize_text) # 列级同义词映射 syn_map = config.get("category_synonyms", {}).get(c, {}) df[c] = apply_category_synonyms(df[c], syn_map) # 罕见类别折叠 df[c] = collapse_rare_categories( df[c], config["rare_category"]["min_fraction"], config["rare_category"]["top_k"] ) # 布尔列转数值 bool_cols = df.select_dtypes(include=["bool"]).columns.tolist() for c in bool_cols: df[c] = df[c].astype(int) # 数值列处理:缺失指示 + 离群剪裁 num_cols = df.select_dtypes(include=[np.number]).columns.tolist() # 不包含标签列 if label_column and label_column in num_cols: num_cols.remove(label_column) df = add_missing_indicators(df, num_cols) df = clip_outliers( df, num_cols, method=config["outlier_clip"]["method"], q_low=config["outlier_clip"]["q_low"], q_high=config["outlier_clip"]["q_high"] ) # 重新识别分类与数值列(增加的指示列均为数值) cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist() num_cols = df.select_dtypes(include=[np.number]).columns.tolist() if label_column and label_column in cat_cols: cat_cols.remove(label_column) if label_column and label_column in num_cols: num_cols.remove(label_column) # 分离标签 y = None if label_column and label_column in df.columns: y = df[label_column] X_df = df.drop(columns=[label_column]) else: X_df = df # 构建预处理管道 numeric_pipeline = Pipeline(steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", RobustScaler()) ]) categorical_pipeline = Pipeline(steps=[ ("imputer", SimpleImputer(strategy="most_frequent")), ("ohe", OneHotEncoder(handle_unknown="ignore", sparse=True)) ]) preprocessor = ColumnTransformer(transformers=[ ("num", numeric_pipeline, num_cols), ("cat", categorical_pipeline, cat_cols) ]) return X_df, y, preprocessor, num_cols, cat_cols def main(): parser = argparse.ArgumentParser(description="混合表格数据预处理脚本(缺失值、离群点、类别统一与编码、单位与尺度统一、时间戳特征工程)") parser.add_argument("--input", required=True, help="输入数据文件路径(CSV/Parquet)") parser.add_argument("--output_dir", required=True, help="输出目录") parser.add_argument("--config", default=None, help="配置文件路径(JSON)") parser.add_argument("--label", default=None, help="标签列名(若配置文件中已指定可省略)") parser.add_argument("--sep", default=",", help="CSV分隔符(默认逗号)") args = parser.parse_args() config = load_config(args.config) if args.label is not None: config["label_column"] = args.label # 读取数据 if args.input.lower().endswith(".parquet"): df = pd.read_parquet(args.input) else: df = pd.read_csv(args.input, sep=args.sep, low_memory=False) # 预处理 X_df, y, preprocessor, num_cols, cat_cols = preprocess_dataframe(df, config, config.get("label_column")) # 拆分(可选) test_size = float(config.get("test_size", 0.0)) random_state = int(config.get("random_state", 42)) if test_size and test_size > 0: if y is None: print("警告:未指定标签列,test_size>0仅拆分特征。", file=sys.stderr) X_train_df, X_test_df, y_train, y_test = train_test_split( X_df, y, test_size=test_size, random_state=random_state, stratify=y if y is not None else None ) # 拟合与变换 X_train = preprocessor.fit_transform(X_train_df) X_test = preprocessor.transform(X_test_df) # 输出保存 import os os.makedirs(args.output_dir, exist_ok=True) # 保存稀疏或稠密特征 if SCIPY_AVAILABLE and issparse(X_train): save_npz(f"{args.output_dir}/X_train.npz", X_train) save_npz(f"{args.output_dir}/X_test.npz", X_test) else: dump(X_train, f"{args.output_dir}/X_train.pkl") dump(X_test, f"{args.output_dir}/X_test.pkl") if y is not None: y_train.to_pickle(f"{args.output_dir}/y_train.pkl") y_test.to_pickle(f"{args.output_dir}/y_test.pkl") # 保存预处理器与列信息 dump(preprocessor, f"{args.output_dir}/preprocessor.joblib") meta = { "numeric_columns": num_cols, "categorical_columns": cat_cols, "datetime_engineered": True, "config": config } with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) print("预处理完成并拆分训练/测试集。") else: # 全量拟合与变换 X = preprocessor.fit_transform(X_df) import os os.makedirs(args.output_dir, exist_ok=True) if SCIPY_AVAILABLE and issparse(X): save_npz(f"{args.output_dir}/X.npz", X) else: dump(X, f"{args.output_dir}/X.pkl") if y is not None: y.to_pickle(f"{args.output_dir}/y.pkl") dump(preprocessor, f"{args.output_dir}/preprocessor.joblib") meta = { "numeric_columns": num_cols, "categorical_columns": cat_cols, "datetime_engineered": True, "config": config } with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f: json.dump(meta, f, ensure_ascii=False, indent=2) print("预处理完成。") if __name__ == "__main__": main() 使用说明与建议 - 运行示例 - 无配置文件(仅默认行为):python preprocess.py --input data.csv --output_dir out --label target - 使用配置文件:python preprocess.py --input data.csv --output_dir out --config config.json - 配置文件示例(JSON) { "label_column": "target", "datetime_columns": ["event_time"], "unit_mapping": { "amount": {"factor": 0.001, "note": "从毫元换算到元"}, "length_mm": {"factor": 0.1, "note": "从毫米到厘米"} }, "category_synonyms": { "gender": {"m": "male", "男": "male", "f": "female", "女": "female"} }, "rare_category": {"min_fraction": 0.01, "top_k": 50}, "outlier_clip": {"method": "quantile", "q_low": 0.01, "q_high": 0.99}, "random_state": 42, "test_size": 0.2 } - 输出内容 - 特征矩阵:out/X.npz(或X.pkl),稀疏矩阵优先 - 标签:out/y.pkl(若指定label_column) - 预处理器:out/preprocessor.joblib(用于在训练或推理阶段对新数据执行同样的变换) - 元信息:out/meta.json(记录列名与配置) - 注意事项 - 单位换算依赖明确的列级配置;脚本不对单位进行自动推断,避免错误转换 - 高基数分类变量在OneHot后可能导致维度较大;脚本通过“罕见类别折叠”为“__other__”控制维度 - 若拟采用线性模型,对极不平衡的类别或长尾分布建议结合降维或正则化策略 - 时间差特征“age_days”使用该列在数据中的最大日期作为参考;如有业务基准时间(例如当前时间或某一基准日),可在配置层面加以扩展替换 - 由于Python原生hash存在随机盐不稳定性,脚本未使用哈希编码;若需进一步缩减高基数维度,可增加哈希桶编码的自定义Transformer(稳定哈希如md5)并在ColumnTransformer中替换OneHot分支 该脚本旨在提供稳健、可复用的预处理基线,兼顾数据质量问题与混合类型特征的工程化,以便后续机器学习建模直接使用。
以下脚本面向“多源报表 CSV 与应用日志”的统一预处理,解决字段命名不统一、日期格式混杂、空值与重复记录等问题,并为后续建模、指标分析与可视化输出干净一致的数据。脚本具备以下能力: - 自动发现并加载目录中的多份 CSV 报表与多种格式的应用日志(JSON 行、key=value、常见时间戳开头的文本日志)。 - 统一字段命名与数据类型,标准化日期时间(含混合格式、时区),规范缺失值与字符串。 - 按业务主键去重,保留最新记录。 - 输出清洗后的标准数据(Parquet/CSV)与基础指标聚合结果(按天/来源/事件)。 - 可配置字段映射、类型与缺失值策略。 使用说明简述: - 依赖:pandas, numpy, python-dateutil - 可选:pyarrow(写 Parquet 更快) - 运行示例: python preprocess_pipeline.py --report-dir ./reports --log-dir ./logs --output-dir ./output --write-parquet Python 脚本 preprocess_pipeline.py: ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 多源 CSV 报表与应用日志预处理脚本 - 统一字段:依据可配置映射,将来源异构字段对齐到统一 schema - 解析日期:支持混合格式与时区,统一到 UTC - 处理缺失:标准化空值、按阈值丢弃列/行、可选简单填充 - 去重:按业务主键或近似主键策略去重 - 合并:多 CSV 与日志合并到统一事件级明细 - 导出:清洗明细 + 指标聚合 """ import os import re import json import argparse from pathlib import Path from typing import Dict, List, Optional, Tuple import pandas as pd import numpy as np from dateutil import parser as du_parser from datetime import timezone # ---------------------------- # 配置:统一字段名、类型与映射 # 可根据实际数据源扩展 # ---------------------------- CANONICAL_COLUMNS = [ "event_time", "event_date", "event_name", "user_id", "session_id", "source", "amount", "currency", "app_version", "device", "country", "id", "created_at" ] # 字段同义词映射(lower 后匹配) COLUMN_SYNONYMS = { "event_time": ["event_time", "time", "timestamp", "ts", "event_ts", "log_time"], "event_name": ["event_name", "event", "action", "eventtype", "name"], "user_id": ["user_id", "uid", "user", "userid", "member_id", "account_id"], "session_id": ["session_id", "sid", "session"], "source": ["source", "src", "channel", "platform", "origin"], "amount": ["amount", "revenue", "value", "price", "amt"], "currency": ["currency", "curr", "ccy"], "app_version": ["app_version", "appver", "version"], "device": ["device", "device_type", "ua_device"], "country": ["country", "country_code", "ctry"], "id": ["id", "event_id", "row_id", "uuid"], "created_at": ["created_at", "ingest_time", "createdtime", "created_ts", "server_time"] } # 推荐数据类型 TARGET_DTYPES = { "event_time": "datetime64[ns, UTC]", # pandas >= 1.1 支持 tz-aware dtype "event_date": "date", # 后续由 event_time 派生(日期粒度) "event_name": "string", "user_id": "string", "session_id": "string", "source": "string", "amount": "float64", "currency": "string", "app_version": "string", "device": "string", "country": "string", "id": "string", "created_at": "datetime64[ns, UTC]" } # 识别为缺失的字符串 NA_VALUES = ["", "na", "n/a", "null", "none", "-", "nan", "NaN", "NAN", "Nil"] # 日期格式尝试集合(尽量覆盖常见格式) DATETIME_FORMATS = [ "%Y-%m-%d %H:%M:%S%z", "%Y-%m-%d %H:%M:%S", "%Y/%m/%d %H:%M:%S", "%d/%m/%Y %H:%M:%S", "%m/%d/%Y %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y-%m-%dT%H:%M:%S%z", "%Y-%m-%dT%H:%M:%S.%f%z", "%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%SZ", ] # 日志时间戳识别(示例:2024-01-01 12:00:00Z / 2024-01-01T12:00:00+08:00 / Jan 02 03:04:05) LOG_TS_REGEXES = [ re.compile(r"^\s*(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[.,]\d+)?(?:Z|[+-]\d{2}:\d{2})?)\s+"), re.compile(r"^\s*(?P<ts>[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"), # Syslog: Mon DD HH:MM:SS ] # ---------------------------- # 工具函数 # ---------------------------- def normalize_colname(name: str) -> str: return re.sub(r"[^a-z0-9_]", "", name.strip().lower()) def infer_and_map_columns(cols: List[str], synonyms: Dict[str, List[str]]) -> Dict[str, str]: """ 根据同义词映射,自动将来源列名映射到规范列名。 返回:source_col -> canonical_col """ norm_cols = {normalize_colname(c): c for c in cols} mapping = {} for canonical, syns in synonyms.items(): syns_norm = [normalize_colname(s) for s in syns] for syn in syns_norm: if syn in norm_cols: mapping[norm_cols[syn]] = canonical break return mapping def parse_mixed_datetime(s: str) -> Optional[pd.Timestamp]: """ 解析混合日期时间字符串,返回 UTC tz-aware Timestamp;失败返回 None """ if s is None: return None if isinstance(s, (pd.Timestamp, np.datetime64)): try: ts = pd.to_datetime(s, utc=True, errors="coerce") return ts if pd.notna(ts) else None except Exception: return None if not isinstance(s, str): try: ts = pd.to_datetime(s, utc=True, errors="coerce") return ts if pd.notna(ts) else None except Exception: return None s_clean = s.strip() if s_clean == "": return None # 先用指定格式尝试 for fmt in DATETIME_FORMATS: try: dt = pd.to_datetime(s_clean, format=fmt, utc=True) return dt except Exception: pass # 再用 dateutil try: dt = du_parser.parse(s_clean) if dt.tzinfo is None: dt = dt.replace(tzinfo=timezone.utc) # 无时区视为 UTC return pd.Timestamp(dt).tz_convert("UTC") except Exception: return None def parse_datetime_series(sr: pd.Series) -> pd.Series: return sr.apply(parse_mixed_datetime) def standardize_strings(sr: pd.Series, lower: bool = True, strip: bool = True) -> pd.Series: if sr.dtype == "O" or pd.api.types.is_string_dtype(sr): sr = sr.astype("string") if strip: sr = sr.str.strip() if lower: sr = sr.str.lower() # 将空字符串标准化为 <NA> sr = sr.replace("", pd.NA) return sr def to_float_series(sr: pd.Series) -> pd.Series: # 清理千分位、货币符号等,转 float if sr.dtype == "O" or pd.api.types.is_string_dtype(sr): sr = sr.str.replace(r"[,\s]", "", regex=True) sr = sr.str.replace(r"[^\d\.\-eE]", "", regex=True) return pd.to_numeric(sr, errors="coerce").astype("float64") def derive_event_date(df: pd.DataFrame) -> pd.DataFrame: if "event_time" in df.columns: df["event_date"] = df["event_time"].dt.date return df def drop_sparse_columns(df: pd.DataFrame, missing_ratio_threshold: float = 0.98) -> pd.DataFrame: missing_ratio = df.isna().mean() keep_cols = missing_ratio[missing_ratio < missing_ratio_threshold].index.tolist() return df[keep_cols] def deduplicate(df: pd.DataFrame) -> pd.DataFrame: """ 优先级: 1) 若存在 'id',按 id 去重 2) 否则按 ['user_id','event_time','event_name','source'] 去重 若存在 created_at,则保留最近(最大)的一条 """ subset = None if "id" in df.columns and df["id"].notna().any(): subset = ["id"] else: cand = [c for c in ["user_id", "event_time", "event_name", "source"] if c in df.columns] subset = cand if len(cand) >= 2 else df.columns.tolist() # 退化为整行去重 if "created_at" in df.columns: df = df.sort_values("created_at", ascending=True) # 保留最新:后续 drop_duplicates(keep='last') df = df.drop_duplicates(subset=subset, keep="last") else: df = df.drop_duplicates(subset=subset, keep="first") return df def cast_to_target_dtypes(df: pd.DataFrame) -> pd.DataFrame: # 字符串标准化 for col in ["event_name", "user_id", "session_id", "source", "currency", "app_version", "device", "country", "id"]: if col in df.columns: df[col] = standardize_strings(df[col], lower=True) # 金额 if "amount" in df.columns: df["amount"] = to_float_series(df["amount"]) # 日期时间 for col in ["event_time", "created_at"]: if col in df.columns: df[col] = parse_datetime_series(df[col]) # 显式设为 UTC tz-aware df[col] = df[col].dt.tz_convert("UTC") # 派生 event_date df = derive_event_date(df) return df def read_csv_reports(report_dir: Path, na_values: List[str]) -> List[pd.DataFrame]: dfs = [] for p in report_dir.rglob("*.csv"): try: df = pd.read_csv( p, dtype=str, na_values=na_values, keep_default_na=True, encoding="utf-8", on_bad_lines="skip" ) if df.empty: continue # 原字段标准化与映射 df.columns = [c.strip() for c in df.columns] col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS) df = df.rename(columns=col_map) df["source"] = df.get("source", pd.Series(["report"] * len(df), index=df.index)) df["source"] = df["source"].fillna("report") df["__file"] = str(p) dfs.append(df) except Exception as e: print(f"[WARN] 读取 CSV 失败: {p} -> {e}") return dfs def extract_kv_pairs(text: str) -> Dict[str, str]: """ 从文本中提取 key=value 对,支持 key="val with space" """ pairs = {} for k, v in re.findall(r'(\b[a-zA-Z_][\w\-]*?)=(".*?"|\'.*?\'|\S+)', text): if v.startswith(("\"", "'")) and v.endswith(("\"", "'")): v = v[1:-1] pairs[k] = v return pairs def parse_log_line(line: str) -> Optional[Dict]: # 优先尝试 JSON line = line.strip() if not line: return None try: obj = json.loads(line) if isinstance(obj, dict): return obj except Exception: pass # 尝试从常见日志格式提取时间戳 ts_val = None for rgx in LOG_TS_REGEXES: m = rgx.match(line) if m: ts_val = m.group("ts") # 去掉时间戳前缀,保留剩余文本以便提取 kv line = line[m.end():] break kv = extract_kv_pairs(line) if ts_val and "timestamp" not in kv and "time" not in kv: kv["timestamp"] = ts_val return kv if kv else None def read_logs(log_dir: Path) -> List[pd.DataFrame]: rows = [] for p in list(log_dir.rglob("*.log")) + list(log_dir.rglob("*.jsonl")) + list(log_dir.rglob("*.txt")): try: with open(p, "r", encoding="utf-8", errors="ignore") as f: for line in f: obj = parse_log_line(line) if not obj: continue obj["__file"] = str(p) rows.append(obj) except Exception as e: print(f"[WARN] 读取日志失败: {p} -> {e}") if not rows: return [] df = pd.DataFrame(rows) if df.empty: return [] # 映射列名 df.columns = [c.strip() for c in df.columns] col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS) df = df.rename(columns=col_map) # 若日志中只有 timestamp,没有 event_time,进行补齐 if "event_time" not in df.columns and "timestamp" in df.columns: df = df.rename(columns={"timestamp": "event_time"}) # 若没有 source,则标记为 log if "source" not in df.columns: df["source"] = "log" return [df] def harmonize_schema(dfs: List[pd.DataFrame]) -> pd.DataFrame: if not dfs: return pd.DataFrame(columns=CANONICAL_COLUMNS) # 统一列集合 all_cols = set() for df in dfs: all_cols |= set(df.columns) all_cols |= set(CANONICAL_COLUMNS) all_cols = list(all_cols) # 对齐并合并 aligned = [] for df in dfs: missing = [c for c in all_cols if c not in df.columns] for c in missing: df[c] = pd.NA aligned.append(df[all_cols]) merged = pd.concat(aligned, ignore_index=True) return merged def clean_and_standardize(df: pd.DataFrame, drop_sparse_threshold: float, fill_amount_zero: bool) -> pd.DataFrame: # 标准化字符串空值 for c in df.columns: if df[c].dtype == "O" or pd.api.types.is_string_dtype(df[c]): df[c] = df[c].replace(NA_VALUES, pd.NA) df = cast_to_target_dtypes(df) # 可选:金额缺失填 0 if fill_amount_zero and "amount" in df.columns: df["amount"] = df["amount"].fillna(0.0) # 丢弃极度稀疏列 df = drop_sparse_columns(df, missing_ratio_threshold=drop_sparse_threshold) # 关键字段最小约束:必须有 event_time if "event_time" in df.columns: df = df[~df["event_time"].isna()] # 去重 df = deduplicate(df) # 排序 sort_cols = [c for c in ["event_time", "created_at"] if c in df.columns] if sort_cols: df = df.sort_values(sort_cols).reset_index(drop=True) return df def aggregate_metrics(df: pd.DataFrame) -> pd.DataFrame: """ 基础指标:按天/来源/事件 - 事件量 - 去重用户数 - 金额汇总 """ needed = ["event_date", "source", "event_name", "user_id", "amount"] for c in needed: if c not in df.columns: df[c] = pd.NA grp = df.groupby(["event_date", "source", "event_name"], dropna=False) out = grp.agg( events=("event_name", "count"), users=("user_id", pd.Series.nunique), amount_sum=("amount", "sum") ).reset_index() # 处理空值显示 out["amount_sum"] = out["amount_sum"].fillna(0.0) return out def save_outputs(df_clean: pd.DataFrame, metrics: pd.DataFrame, output_dir: Path, write_parquet: bool) -> Tuple[Path, Path]: output_dir.mkdir(parents=True, exist_ok=True) clean_path = output_dir / ("clean_events.parquet" if write_parquet else "clean_events.csv") metrics_path = output_dir / "metrics_daily.csv" if write_parquet: try: df_clean.to_parquet(clean_path, index=False) except Exception as e: print(f"[WARN] 写入 Parquet 失败,回退为 CSV:{e}") clean_path = output_dir / "clean_events.csv" df_clean.to_csv(clean_path, index=False) else: df_clean.to_csv(clean_path, index=False) metrics.to_csv(metrics_path, index=False) return clean_path, metrics_path def report_stats(raw_dfs_count: int, df_clean: pd.DataFrame): print("========== 处理统计 ==========") print(f"合并数据源数量: {raw_dfs_count}") print(f"清洗后记录数: {len(df_clean)}") if "user_id" in df_clean.columns: print(f"去重用户数: {df_clean['user_id'].nunique(dropna=True)}") if "event_date" in df_clean.columns: print(f"日期范围: {df_clean['event_date'].min()} ~ {df_clean['event_date'].max()}") print("=============================") def main(): parser = argparse.ArgumentParser(description="多源 CSV + 日志预处理管道") parser.add_argument("--report-dir", type=str, required=False, help="CSV 报表目录") parser.add_argument("--log-dir", type=str, required=False, help="日志目录(.log/.jsonl/.txt)") parser.add_argument("--output-dir", type=str, required=True, help="输出目录") parser.add_argument("--drop-sparse-threshold", type=float, default=0.98, help="丢弃稀疏列阈值(缺失率>=阈值将被丢弃)") parser.add_argument("--fill-amount-zero", action="store_true", help="将金额缺失填充为 0") parser.add_argument("--write-parquet", action="store_true", help="输出 Parquet(需安装 pyarrow 或 fastparquet)") args = parser.parse_args() report_dir = Path(args.report_dir) if args.report_dir else None log_dir = Path(args.log_dir) if args.log_dir else None output_dir = Path(args.output_dir) dfs: List[pd.DataFrame] = [] if report_dir and report_dir.exists(): dfs += read_csv_reports(report_dir, NA_VALUES) if log_dir and log_dir.exists(): dfs += read_logs(log_dir) if not dfs: print("[ERROR] 未在指定目录中发现可用数据源。") return df_all = harmonize_schema(dfs) df_clean = clean_and_standardize( df_all, drop_sparse_threshold=args.drop_sparse_threshold, fill_amount_zero=args.fill_amount_zero ) metrics = aggregate_metrics(df_clean) save_outputs(df_clean, metrics, output_dir, write_parquet=args.write_parquet) report_stats(len(dfs), df_clean) print(f"输出目录: {output_dir.resolve()}") if __name__ == "__main__": pd.options.mode.chained_assignment = None # 降低不必要的警告噪声 main() ``` 关键实现细节与策略说明: - 字段对齐:通过 COLUMN_SYNONYMS 自动将来源字段映射到统一 schema,规避“命名不统一”问题。未识别到的字段保留原列名,便于后续扩展映射。 - 日期时间解析:parse_mixed_datetime 优先尝试常见格式,再回退到 dateutil 解析,并统一转换为 UTC。无时区视为 UTC,避免偏移误差。 - 缺失值与字符串标准化:将多种“空值表示”统一为缺失;对事件名、来源、用户 ID 等进行去空格、统一小写处理,利于分组与去重。 - 金额字段:去除千分位与符号后转 float;可选填充缺失为 0(--fill-amount-zero)。 - 去重策略:优先使用唯一 id;否则退化到 ['user_id','event_time','event_name','source'] 组合主键;如有 created_at,保留最新记录。 - 日志解析:优先 JSON 行;否则用正则提取时间戳与 key=value 对,最大化结构化信息保留。 - 指标聚合:输出每日/来源/事件的事件量、去重用户数与金额汇总,便于可视化与监控。 - 可扩展性:COLUMN_SYNONYMS 与 DATETIME_FORMATS 可根据实际数据源更新;如需复杂数据类型与业务校验,可在 cast_to_target_dtypes 与 clean_and_standardize 中扩展。 如需进一步集成特定业务规则(例如事件白名单、非法事件过滤、国家与货币映射、异常值截断/Winsorization),可在清洗阶段插入对应逻辑,以保证下游分析与建模的稳定性与一致性。
快速搭建稳健的数据预处理管线;统一编码与标准化;提升模型效果并缩短迭代周期。
将原始数据一键转为可训练数据;自动划分数据集与固定随机种子;便于上线、复盘与复现。
整合多源报表与日志;清洗、去重、补全缺失;输出规范化数据表用于指标分析与可视化。
按业务规则定制特征与标签;分箱、异常标记、频次统计;快速验证策略并加速落地。
用于实验与教学;生成清晰脚本与说明;帮助学生理解流程并保证实验重现性。
在资源有限时快速形成数据处理方案;缩短原型到验证周期;降低外包与沟通成本。
让AI充当资深数据挖掘顾问,按你的数据集特征快速生成可运行的Python数据预处理脚本,覆盖缺失值处理、重复与异常清理、字段类型统一、类别编码、数值缩放、时间与文本特征提取、训练/验证/测试拆分等关键环节;以清晰、标准、可复用的流程,帮助团队在几分钟内把“原始数据”转变为“模型可用数据”,显著缩短建模周期、提升模型表现、减少人为失误,并支持中英文输出以便协作与交付。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期