热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
生成一个Python脚本,用于为机器学习准备数据集。
以下内容包含两部分:
一、设计说明
二、Python脚本 说明:脚本支持命令行参数;建议准备一个JSON配置文件以定义单位换算与列级同义词映射。若没有配置文件,脚本使用内置的通用默认行为。
""" 预处理脚本文件名示例:preprocess.py 依赖:pandas, numpy, scikit-learn, joblib 可选:scipy(用于保存稀疏矩阵,如未安装则以joblib保存) """
import argparse import json import re import sys from typing import Dict, List, Tuple, Optional
import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import OneHotEncoder, RobustScaler from sklearn.model_selection import train_test_split from joblib import dump
try: from scipy.sparse import save_npz, issparse SCIPY_AVAILABLE = True except Exception: SCIPY_AVAILABLE = False
def load_config(config_path: Optional[str]) -> Dict: default_config = { "label_column": None, # 目标列名;不指定则不分离y "datetime_columns": [], # 显式给出时间戳列名;为空时自动推断 "unit_mapping": { # 示例: # "amount": {"factor": 0.001, "note": "从毫元(1e-3元)转为元"} }, "category_synonyms": { # 可按列定义同义词映射,例如: # "gender": {"m": "male", "男": "male", "f": "female", "女": "female"} }, "rare_category": { "min_fraction": 0.01, # 频率低于此阈值折叠 "top_k": 50 # 保留频率最高的前K类,其他折叠 }, "outlier_clip": { "method": "quantile", "q_low": 0.01, "q_high": 0.99 }, "random_state": 42, "test_size": 0.0 # 若>0则进行train/test拆分 } if config_path is None: return default_config with open(config_path, "r", encoding="utf-8") as f: user_cfg = json.load(f) # 递归合并默认配置 def merge(a, b): for k, v in b.items(): if isinstance(v, dict) and k in a and isinstance(a[k], dict): merge(a[k], v) else: a[k] = v merge(default_config, user_cfg) return default_config
def is_boolean_like(val: str) -> Optional[str]: # 统一常见布尔文本至 "yes"/"no" if val is None: return None s = str(val).strip().lower() true_set = {"yes", "y", "true", "t", "1", "是", "對", "对"} false_set = {"no", "n", "false", "f", "0", "否", "不"} if s in true_set: return "yes" if s in false_set: return "no" return None
def normalize_text(s: str) -> Optional[str]: if pd.isna(s): return np.nan t = str(s).strip().lower() t = re.sub(r"\s+", " ", t) t = t.replace("–", "-").replace("—", "-") # 常见缺失标记归为NaN missing_tokens = {"", "na", "n/a", "none", "null", "-", "?", "unknown", "未知", "未提供"} if t in missing_tokens: return np.nan # 统一布尔 bl = is_boolean_like(t) if bl is not None: return bl return t
def apply_category_synonyms(series: pd.Series, mapping: Dict[str, str]) -> pd.Series: if not mapping: return series # 规范化键 norm_map = {normalize_text(k): v for k, v in mapping.items()} return series.map(lambda x: norm_map.get(normalize_text(x), normalize_text(x)))
def collapse_rare_categories(series: pd.Series, min_fraction: float, top_k: int) -> pd.Series: s = series.astype("object") vc = s.value_counts(dropna=False) total = len(s) # 保留top_k与频率>=min_fraction的类别 keep = set(vc.nlargest(top_k).index) keep |= set(vc[vc / total >= min_fraction].index) return s.map(lambda x: x if x in keep else "other")
def infer_datetime_columns(df: pd.DataFrame, explicit_cols: List[str]) -> List[str]: dt_cols = set([c for c in explicit_cols if c in df.columns]) # 自动推断:object列高占比可解析为datetime obj_cols = df.select_dtypes(include=["object"]).columns for c in obj_cols: s = df[c].dropna().astype(str) if s.empty: continue sample = s.sample(min(len(s), 1000), random_state=0) parsed = pd.to_datetime(sample, errors="coerce", infer_datetime_format=True, utc=False) ok_ratio = parsed.notna().mean() if ok_ratio >= 0.8: dt_cols.add(c) # 原生datetime dt_cols |= set(df.select_dtypes(include=["datetime64[ns]", "datetime64[ns, UTC]"]).columns) return list(dt_cols)
def parse_and_engineer_datetime(df: pd.DataFrame, dt_cols: List[str]) -> pd.DataFrame: for c in dt_cols: if not np.issubdtype(df[c].dtype, np.datetime64): df[c] = pd.to_datetime(df[c], errors="coerce", infer_datetime_format=True) # 缺失指示 df[f"{c}__is_missing"] = df[c].isna().astype(int) # 特征 df[f"{c}__year"] = df[c].dt.year df[f"{c}__month"] = df[c].dt.month df[f"{c}__day"] = df[c].dt.day df[f"{c}__dayofweek"] = df[c].dt.dayofweek df[f"{c}__hour"] = df[c].dt.hour df[f"{c}__is_weekend"] = df[c].dt.dayofweek.isin([5, 6]).astype(int) # 与最大日期的间隔(天) max_date = df[c].max() if pd.notna(max_date): df[f"{c}__age_days"] = (max_date - df[c]).dt.days else: df[f"{c}__age_days"] = np.nan # 原始日期列不进入模型 df.drop(columns=[c], inplace=True) return df
def apply_unit_mapping(df: pd.DataFrame, unit_map: Dict[str, Dict]) -> pd.DataFrame: for col, rule in unit_map.items(): if col in df.columns and "factor" in rule: factor = rule["factor"] # 仅对数值列进行换算 if pd.api.types.is_numeric_dtype(df[col]): df[col] = df[col].astype(float) * float(factor) return df
def clip_outliers(df: pd.DataFrame, num_cols: List[str], method: str = "quantile", q_low: float = 0.01, q_high: float = 0.99) -> pd.DataFrame: for c in num_cols: if not pd.api.types.is_numeric_dtype(df[c]): continue s = df[c] if method == "quantile": lo = s.quantile(q_low) hi = s.quantile(q_high) df[c] = s.clip(lower=lo, upper=hi) else: # IQR 方法可选:1.5*IQR q1 = s.quantile(0.25) q3 = s.quantile(0.75) iqr = q3 - q1 lo = q1 - 1.5 * iqr hi = q3 + 1.5 * iqr df[c] = s.clip(lower=lo, upper=hi) return df
def add_missing_indicators(df: pd.DataFrame, cols: List[str]) -> pd.DataFrame: for c in cols: if c in df.columns: df[f"{c}__is_missing"] = df[c].isna().astype(int) return df
def preprocess_dataframe(df: pd.DataFrame, config: Dict, label_column: Optional[str]) -> Tuple: # 单位换算(需在类型识别前执行,避免后续转换影响) df = apply_unit_mapping(df, config.get("unit_mapping", {}))
# 类型识别
datetime_cols = infer_datetime_columns(df, config.get("datetime_columns", []))
df = parse_and_engineer_datetime(df, datetime_cols)
# 分类列规范化(仅针对object/category)
cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
for c in cat_cols:
# 文本标准化
df[c] = df[c].map(normalize_text)
# 列级同义词映射
syn_map = config.get("category_synonyms", {}).get(c, {})
df[c] = apply_category_synonyms(df[c], syn_map)
# 罕见类别折叠
df[c] = collapse_rare_categories(
df[c],
config["rare_category"]["min_fraction"],
config["rare_category"]["top_k"]
)
# 布尔列转数值
bool_cols = df.select_dtypes(include=["bool"]).columns.tolist()
for c in bool_cols:
df[c] = df[c].astype(int)
# 数值列处理:缺失指示 + 离群剪裁
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
# 不包含标签列
if label_column and label_column in num_cols:
num_cols.remove(label_column)
df = add_missing_indicators(df, num_cols)
df = clip_outliers(
df, num_cols,
method=config["outlier_clip"]["method"],
q_low=config["outlier_clip"]["q_low"],
q_high=config["outlier_clip"]["q_high"]
)
# 重新识别分类与数值列(增加的指示列均为数值)
cat_cols = df.select_dtypes(include=["object", "category"]).columns.tolist()
num_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if label_column and label_column in cat_cols:
cat_cols.remove(label_column)
if label_column and label_column in num_cols:
num_cols.remove(label_column)
# 分离标签
y = None
if label_column and label_column in df.columns:
y = df[label_column]
X_df = df.drop(columns=[label_column])
else:
X_df = df
# 构建预处理管道
numeric_pipeline = Pipeline(steps=[
("imputer", SimpleImputer(strategy="median")),
("scaler", RobustScaler())
])
categorical_pipeline = Pipeline(steps=[
("imputer", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(handle_unknown="ignore", sparse=True))
])
preprocessor = ColumnTransformer(transformers=[
("num", numeric_pipeline, num_cols),
("cat", categorical_pipeline, cat_cols)
])
return X_df, y, preprocessor, num_cols, cat_cols
def main(): parser = argparse.ArgumentParser(description="混合表格数据预处理脚本(缺失值、离群点、类别统一与编码、单位与尺度统一、时间戳特征工程)") parser.add_argument("--input", required=True, help="输入数据文件路径(CSV/Parquet)") parser.add_argument("--output_dir", required=True, help="输出目录") parser.add_argument("--config", default=None, help="配置文件路径(JSON)") parser.add_argument("--label", default=None, help="标签列名(若配置文件中已指定可省略)") parser.add_argument("--sep", default=",", help="CSV分隔符(默认逗号)") args = parser.parse_args()
config = load_config(args.config)
if args.label is not None:
config["label_column"] = args.label
# 读取数据
if args.input.lower().endswith(".parquet"):
df = pd.read_parquet(args.input)
else:
df = pd.read_csv(args.input, sep=args.sep, low_memory=False)
# 预处理
X_df, y, preprocessor, num_cols, cat_cols = preprocess_dataframe(df, config, config.get("label_column"))
# 拆分(可选)
test_size = float(config.get("test_size", 0.0))
random_state = int(config.get("random_state", 42))
if test_size and test_size > 0:
if y is None:
print("警告:未指定标签列,test_size>0仅拆分特征。", file=sys.stderr)
X_train_df, X_test_df, y_train, y_test = train_test_split(
X_df, y, test_size=test_size, random_state=random_state, stratify=y if y is not None else None
)
# 拟合与变换
X_train = preprocessor.fit_transform(X_train_df)
X_test = preprocessor.transform(X_test_df)
# 输出保存
import os
os.makedirs(args.output_dir, exist_ok=True)
# 保存稀疏或稠密特征
if SCIPY_AVAILABLE and issparse(X_train):
save_npz(f"{args.output_dir}/X_train.npz", X_train)
save_npz(f"{args.output_dir}/X_test.npz", X_test)
else:
dump(X_train, f"{args.output_dir}/X_train.pkl")
dump(X_test, f"{args.output_dir}/X_test.pkl")
if y is not None:
y_train.to_pickle(f"{args.output_dir}/y_train.pkl")
y_test.to_pickle(f"{args.output_dir}/y_test.pkl")
# 保存预处理器与列信息
dump(preprocessor, f"{args.output_dir}/preprocessor.joblib")
meta = {
"numeric_columns": num_cols,
"categorical_columns": cat_cols,
"datetime_engineered": True,
"config": config
}
with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
print("预处理完成并拆分训练/测试集。")
else:
# 全量拟合与变换
X = preprocessor.fit_transform(X_df)
import os
os.makedirs(args.output_dir, exist_ok=True)
if SCIPY_AVAILABLE and issparse(X):
save_npz(f"{args.output_dir}/X.npz", X)
else:
dump(X, f"{args.output_dir}/X.pkl")
if y is not None:
y.to_pickle(f"{args.output_dir}/y.pkl")
dump(preprocessor, f"{args.output_dir}/preprocessor.joblib")
meta = {
"numeric_columns": num_cols,
"categorical_columns": cat_cols,
"datetime_engineered": True,
"config": config
}
with open(f"{args.output_dir}/meta.json", "w", encoding="utf-8") as f:
json.dump(meta, f, ensure_ascii=False, indent=2)
print("预处理完成。")
if name == "main": main()
使用说明与建议
该脚本旨在提供稳健、可复用的预处理基线,兼顾数据质量问题与混合类型特征的工程化,以便后续机器学习建模直接使用。
以下脚本面向“多源报表 CSV 与应用日志”的统一预处理,解决字段命名不统一、日期格式混杂、空值与重复记录等问题,并为后续建模、指标分析与可视化输出干净一致的数据。脚本具备以下能力:
使用说明简述:
Python 脚本 preprocess_pipeline.py:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
多源 CSV 报表与应用日志预处理脚本
- 统一字段:依据可配置映射,将来源异构字段对齐到统一 schema
- 解析日期:支持混合格式与时区,统一到 UTC
- 处理缺失:标准化空值、按阈值丢弃列/行、可选简单填充
- 去重:按业务主键或近似主键策略去重
- 合并:多 CSV 与日志合并到统一事件级明细
- 导出:清洗明细 + 指标聚合
"""
import os
import re
import json
import argparse
from pathlib import Path
from typing import Dict, List, Optional, Tuple
import pandas as pd
import numpy as np
from dateutil import parser as du_parser
from datetime import timezone
# ----------------------------
# 配置:统一字段名、类型与映射
# 可根据实际数据源扩展
# ----------------------------
CANONICAL_COLUMNS = [
"event_time", "event_date", "event_name", "user_id", "session_id", "source",
"amount", "currency", "app_version", "device", "country", "id", "created_at"
]
# 字段同义词映射(lower 后匹配)
COLUMN_SYNONYMS = {
"event_time": ["event_time", "time", "timestamp", "ts", "event_ts", "log_time"],
"event_name": ["event_name", "event", "action", "eventtype", "name"],
"user_id": ["user_id", "uid", "user", "userid", "member_id", "account_id"],
"session_id": ["session_id", "sid", "session"],
"source": ["source", "src", "channel", "platform", "origin"],
"amount": ["amount", "revenue", "value", "price", "amt"],
"currency": ["currency", "curr", "ccy"],
"app_version": ["app_version", "appver", "version"],
"device": ["device", "device_type", "ua_device"],
"country": ["country", "country_code", "ctry"],
"id": ["id", "event_id", "row_id", "uuid"],
"created_at": ["created_at", "ingest_time", "createdtime", "created_ts", "server_time"]
}
# 推荐数据类型
TARGET_DTYPES = {
"event_time": "datetime64[ns, UTC]", # pandas >= 1.1 支持 tz-aware dtype
"event_date": "date", # 后续由 event_time 派生(日期粒度)
"event_name": "string",
"user_id": "string",
"session_id": "string",
"source": "string",
"amount": "float64",
"currency": "string",
"app_version": "string",
"device": "string",
"country": "string",
"id": "string",
"created_at": "datetime64[ns, UTC]"
}
# 识别为缺失的字符串
NA_VALUES = ["", "na", "n/a", "null", "none", "-", "nan", "NaN", "NAN", "Nil"]
# 日期格式尝试集合(尽量覆盖常见格式)
DATETIME_FORMATS = [
"%Y-%m-%d %H:%M:%S%z",
"%Y-%m-%d %H:%M:%S",
"%Y/%m/%d %H:%M:%S",
"%d/%m/%Y %H:%M:%S",
"%m/%d/%Y %H:%M:%S",
"%Y-%m-%d",
"%d/%m/%Y",
"%m/%d/%Y",
"%Y-%m-%dT%H:%M:%S%z",
"%Y-%m-%dT%H:%M:%S.%f%z",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ",
]
# 日志时间戳识别(示例:2024-01-01 12:00:00Z / 2024-01-01T12:00:00+08:00 / Jan 02 03:04:05)
LOG_TS_REGEXES = [
re.compile(r"^\s*(?P<ts>\d{4}-\d{2}-\d{2}[ T]\d{2}:\d{2}:\d{2}(?:[.,]\d+)?(?:Z|[+-]\d{2}:\d{2})?)\s+"),
re.compile(r"^\s*(?P<ts>[A-Z][a-z]{2}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+"), # Syslog: Mon DD HH:MM:SS
]
# ----------------------------
# 工具函数
# ----------------------------
def normalize_colname(name: str) -> str:
return re.sub(r"[^a-z0-9_]", "", name.strip().lower())
def infer_and_map_columns(cols: List[str], synonyms: Dict[str, List[str]]) -> Dict[str, str]:
"""
根据同义词映射,自动将来源列名映射到规范列名。
返回:source_col -> canonical_col
"""
norm_cols = {normalize_colname(c): c for c in cols}
mapping = {}
for canonical, syns in synonyms.items():
syns_norm = [normalize_colname(s) for s in syns]
for syn in syns_norm:
if syn in norm_cols:
mapping[norm_cols[syn]] = canonical
break
return mapping
def parse_mixed_datetime(s: str) -> Optional[pd.Timestamp]:
"""
解析混合日期时间字符串,返回 UTC tz-aware Timestamp;失败返回 None
"""
if s is None:
return None
if isinstance(s, (pd.Timestamp, np.datetime64)):
try:
ts = pd.to_datetime(s, utc=True, errors="coerce")
return ts if pd.notna(ts) else None
except Exception:
return None
if not isinstance(s, str):
try:
ts = pd.to_datetime(s, utc=True, errors="coerce")
return ts if pd.notna(ts) else None
except Exception:
return None
s_clean = s.strip()
if s_clean == "":
return None
# 先用指定格式尝试
for fmt in DATETIME_FORMATS:
try:
dt = pd.to_datetime(s_clean, format=fmt, utc=True)
return dt
except Exception:
pass
# 再用 dateutil
try:
dt = du_parser.parse(s_clean)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone.utc) # 无时区视为 UTC
return pd.Timestamp(dt).tz_convert("UTC")
except Exception:
return None
def parse_datetime_series(sr: pd.Series) -> pd.Series:
return sr.apply(parse_mixed_datetime)
def standardize_strings(sr: pd.Series, lower: bool = True, strip: bool = True) -> pd.Series:
if sr.dtype == "O" or pd.api.types.is_string_dtype(sr):
sr = sr.astype("string")
if strip:
sr = sr.str.strip()
if lower:
sr = sr.str.lower()
# 将空字符串标准化为 <NA>
sr = sr.replace("", pd.NA)
return sr
def to_float_series(sr: pd.Series) -> pd.Series:
# 清理千分位、货币符号等,转 float
if sr.dtype == "O" or pd.api.types.is_string_dtype(sr):
sr = sr.str.replace(r"[,\s]", "", regex=True)
sr = sr.str.replace(r"[^\d\.\-eE]", "", regex=True)
return pd.to_numeric(sr, errors="coerce").astype("float64")
def derive_event_date(df: pd.DataFrame) -> pd.DataFrame:
if "event_time" in df.columns:
df["event_date"] = df["event_time"].dt.date
return df
def drop_sparse_columns(df: pd.DataFrame, missing_ratio_threshold: float = 0.98) -> pd.DataFrame:
missing_ratio = df.isna().mean()
keep_cols = missing_ratio[missing_ratio < missing_ratio_threshold].index.tolist()
return df[keep_cols]
def deduplicate(df: pd.DataFrame) -> pd.DataFrame:
"""
优先级:
1) 若存在 'id',按 id 去重
2) 否则按 ['user_id','event_time','event_name','source'] 去重
若存在 created_at,则保留最近(最大)的一条
"""
subset = None
if "id" in df.columns and df["id"].notna().any():
subset = ["id"]
else:
cand = [c for c in ["user_id", "event_time", "event_name", "source"] if c in df.columns]
subset = cand if len(cand) >= 2 else df.columns.tolist() # 退化为整行去重
if "created_at" in df.columns:
df = df.sort_values("created_at", ascending=True) # 保留最新:后续 drop_duplicates(keep='last')
df = df.drop_duplicates(subset=subset, keep="last")
else:
df = df.drop_duplicates(subset=subset, keep="first")
return df
def cast_to_target_dtypes(df: pd.DataFrame) -> pd.DataFrame:
# 字符串标准化
for col in ["event_name", "user_id", "session_id", "source", "currency", "app_version", "device", "country", "id"]:
if col in df.columns:
df[col] = standardize_strings(df[col], lower=True)
# 金额
if "amount" in df.columns:
df["amount"] = to_float_series(df["amount"])
# 日期时间
for col in ["event_time", "created_at"]:
if col in df.columns:
df[col] = parse_datetime_series(df[col])
# 显式设为 UTC tz-aware
df[col] = df[col].dt.tz_convert("UTC")
# 派生 event_date
df = derive_event_date(df)
return df
def read_csv_reports(report_dir: Path, na_values: List[str]) -> List[pd.DataFrame]:
dfs = []
for p in report_dir.rglob("*.csv"):
try:
df = pd.read_csv(
p,
dtype=str,
na_values=na_values,
keep_default_na=True,
encoding="utf-8",
on_bad_lines="skip"
)
if df.empty:
continue
# 原字段标准化与映射
df.columns = [c.strip() for c in df.columns]
col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS)
df = df.rename(columns=col_map)
df["source"] = df.get("source", pd.Series(["report"] * len(df), index=df.index))
df["source"] = df["source"].fillna("report")
df["__file"] = str(p)
dfs.append(df)
except Exception as e:
print(f"[WARN] 读取 CSV 失败: {p} -> {e}")
return dfs
def extract_kv_pairs(text: str) -> Dict[str, str]:
"""
从文本中提取 key=value 对,支持 key="val with space"
"""
pairs = {}
for k, v in re.findall(r'(\b[a-zA-Z_][\w\-]*?)=(".*?"|\'.*?\'|\S+)', text):
if v.startswith(("\"", "'")) and v.endswith(("\"", "'")):
v = v[1:-1]
pairs[k] = v
return pairs
def parse_log_line(line: str) -> Optional[Dict]:
# 优先尝试 JSON
line = line.strip()
if not line:
return None
try:
obj = json.loads(line)
if isinstance(obj, dict):
return obj
except Exception:
pass
# 尝试从常见日志格式提取时间戳
ts_val = None
for rgx in LOG_TS_REGEXES:
m = rgx.match(line)
if m:
ts_val = m.group("ts")
# 去掉时间戳前缀,保留剩余文本以便提取 kv
line = line[m.end():]
break
kv = extract_kv_pairs(line)
if ts_val and "timestamp" not in kv and "time" not in kv:
kv["timestamp"] = ts_val
return kv if kv else None
def read_logs(log_dir: Path) -> List[pd.DataFrame]:
rows = []
for p in list(log_dir.rglob("*.log")) + list(log_dir.rglob("*.jsonl")) + list(log_dir.rglob("*.txt")):
try:
with open(p, "r", encoding="utf-8", errors="ignore") as f:
for line in f:
obj = parse_log_line(line)
if not obj:
continue
obj["__file"] = str(p)
rows.append(obj)
except Exception as e:
print(f"[WARN] 读取日志失败: {p} -> {e}")
if not rows:
return []
df = pd.DataFrame(rows)
if df.empty:
return []
# 映射列名
df.columns = [c.strip() for c in df.columns]
col_map = infer_and_map_columns(df.columns.tolist(), COLUMN_SYNONYMS)
df = df.rename(columns=col_map)
# 若日志中只有 timestamp,没有 event_time,进行补齐
if "event_time" not in df.columns and "timestamp" in df.columns:
df = df.rename(columns={"timestamp": "event_time"})
# 若没有 source,则标记为 log
if "source" not in df.columns:
df["source"] = "log"
return [df]
def harmonize_schema(dfs: List[pd.DataFrame]) -> pd.DataFrame:
if not dfs:
return pd.DataFrame(columns=CANONICAL_COLUMNS)
# 统一列集合
all_cols = set()
for df in dfs:
all_cols |= set(df.columns)
all_cols |= set(CANONICAL_COLUMNS)
all_cols = list(all_cols)
# 对齐并合并
aligned = []
for df in dfs:
missing = [c for c in all_cols if c not in df.columns]
for c in missing:
df[c] = pd.NA
aligned.append(df[all_cols])
merged = pd.concat(aligned, ignore_index=True)
return merged
def clean_and_standardize(df: pd.DataFrame, drop_sparse_threshold: float, fill_amount_zero: bool) -> pd.DataFrame:
# 标准化字符串空值
for c in df.columns:
if df[c].dtype == "O" or pd.api.types.is_string_dtype(df[c]):
df[c] = df[c].replace(NA_VALUES, pd.NA)
df = cast_to_target_dtypes(df)
# 可选:金额缺失填 0
if fill_amount_zero and "amount" in df.columns:
df["amount"] = df["amount"].fillna(0.0)
# 丢弃极度稀疏列
df = drop_sparse_columns(df, missing_ratio_threshold=drop_sparse_threshold)
# 关键字段最小约束:必须有 event_time
if "event_time" in df.columns:
df = df[~df["event_time"].isna()]
# 去重
df = deduplicate(df)
# 排序
sort_cols = [c for c in ["event_time", "created_at"] if c in df.columns]
if sort_cols:
df = df.sort_values(sort_cols).reset_index(drop=True)
return df
def aggregate_metrics(df: pd.DataFrame) -> pd.DataFrame:
"""
基础指标:按天/来源/事件
- 事件量
- 去重用户数
- 金额汇总
"""
needed = ["event_date", "source", "event_name", "user_id", "amount"]
for c in needed:
if c not in df.columns:
df[c] = pd.NA
grp = df.groupby(["event_date", "source", "event_name"], dropna=False)
out = grp.agg(
events=("event_name", "count"),
users=("user_id", pd.Series.nunique),
amount_sum=("amount", "sum")
).reset_index()
# 处理空值显示
out["amount_sum"] = out["amount_sum"].fillna(0.0)
return out
def save_outputs(df_clean: pd.DataFrame, metrics: pd.DataFrame, output_dir: Path, write_parquet: bool) -> Tuple[Path, Path]:
output_dir.mkdir(parents=True, exist_ok=True)
clean_path = output_dir / ("clean_events.parquet" if write_parquet else "clean_events.csv")
metrics_path = output_dir / "metrics_daily.csv"
if write_parquet:
try:
df_clean.to_parquet(clean_path, index=False)
except Exception as e:
print(f"[WARN] 写入 Parquet 失败,回退为 CSV:{e}")
clean_path = output_dir / "clean_events.csv"
df_clean.to_csv(clean_path, index=False)
else:
df_clean.to_csv(clean_path, index=False)
metrics.to_csv(metrics_path, index=False)
return clean_path, metrics_path
def report_stats(raw_dfs_count: int, df_clean: pd.DataFrame):
print("========== 处理统计 ==========")
print(f"合并数据源数量: {raw_dfs_count}")
print(f"清洗后记录数: {len(df_clean)}")
if "user_id" in df_clean.columns:
print(f"去重用户数: {df_clean['user_id'].nunique(dropna=True)}")
if "event_date" in df_clean.columns:
print(f"日期范围: {df_clean['event_date'].min()} ~ {df_clean['event_date'].max()}")
print("=============================")
def main():
parser = argparse.ArgumentParser(description="多源 CSV + 日志预处理管道")
parser.add_argument("--report-dir", type=str, required=False, help="CSV 报表目录")
parser.add_argument("--log-dir", type=str, required=False, help="日志目录(.log/.jsonl/.txt)")
parser.add_argument("--output-dir", type=str, required=True, help="输出目录")
parser.add_argument("--drop-sparse-threshold", type=float, default=0.98, help="丢弃稀疏列阈值(缺失率>=阈值将被丢弃)")
parser.add_argument("--fill-amount-zero", action="store_true", help="将金额缺失填充为 0")
parser.add_argument("--write-parquet", action="store_true", help="输出 Parquet(需安装 pyarrow 或 fastparquet)")
args = parser.parse_args()
report_dir = Path(args.report_dir) if args.report_dir else None
log_dir = Path(args.log_dir) if args.log_dir else None
output_dir = Path(args.output_dir)
dfs: List[pd.DataFrame] = []
if report_dir and report_dir.exists():
dfs += read_csv_reports(report_dir, NA_VALUES)
if log_dir and log_dir.exists():
dfs += read_logs(log_dir)
if not dfs:
print("[ERROR] 未在指定目录中发现可用数据源。")
return
df_all = harmonize_schema(dfs)
df_clean = clean_and_standardize(
df_all,
drop_sparse_threshold=args.drop_sparse_threshold,
fill_amount_zero=args.fill_amount_zero
)
metrics = aggregate_metrics(df_clean)
save_outputs(df_clean, metrics, output_dir, write_parquet=args.write_parquet)
report_stats(len(dfs), df_clean)
print(f"输出目录: {output_dir.resolve()}")
if __name__ == "__main__":
pd.options.mode.chained_assignment = None # 降低不必要的警告噪声
main()
关键实现细节与策略说明:
如需进一步集成特定业务规则(例如事件白名单、非法事件过滤、国家与货币映射、异常值截断/Winsorization),可在清洗阶段插入对应逻辑,以保证下游分析与建模的稳定性与一致性。
让AI充当资深数据挖掘顾问,按你的数据集特征快速生成可运行的Python数据预处理脚本,覆盖缺失值处理、重复与异常清理、字段类型统一、类别编码、数值缩放、时间与文本特征提取、训练/验证/测试拆分等关键环节;以清晰、标准、可复用的流程,帮助团队在几分钟内把“原始数据”转变为“模型可用数据”,显著缩短建模周期、提升模型表现、减少人为失误,并支持中英文输出以便协作与交付。
快速搭建稳健的数据预处理管线;统一编码与标准化;提升模型效果并缩短迭代周期。
将原始数据一键转为可训练数据;自动划分数据集与固定随机种子;便于上线、复盘与复现。
整合多源报表与日志;清洗、去重、补全缺失;输出规范化数据表用于指标分析与可视化。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期