热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
根据数据集特征和处理需求,自动生成可执行的Python标准化脚本,支持多种标准化方法和缺失值处理策略,提高数据清洗和建模准备效率。
以下为一份可在生产环境运行的PySpark数据标准化脚本,覆盖您给出的业务约束与目标。脚本实现以下要点:
使用说明
脚本:standardize_orders.py
代码如下:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import argparse
import sys
import yaml
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import types as T
# ----------------------------
# 配置与常量
# ----------------------------
NUMERIC_COLS_BASE = [
"quantity",
"unit_price",
"discount_amount",
"order_amount",
]
CURRENCY_COLS = ["unit_price", "discount_amount", "order_amount"]
CATEGORICAL_COLS = [
"category_lvl1",
"category_lvl2",
"category_lvl3",
"province",
"city",
"payment_channel",
"currency",
"coupon_code",
"invoice_type",
]
ID_COLS = ["order_id", "user_id", "product_id"]
# 强制设置会话时区为 Asia/Shanghai(关键:统一时区)
SPARK_CONF = {
"spark.sql.session.timeZone": "Asia/Shanghai",
"spark.sql.legacy.timeParserPolicy": "LEGACY", # 兼容多格式解析
"spark.sql.shuffle.partitions": "400", # 可按集群规模调优
"spark.executor.memoryOverhead": "2g"
}
# ----------------------------
# 工具函数
# ----------------------------
def start_spark(app_name="order_standardization"):
builder = SparkSession.builder.appName(app_name)
for k, v in SPARK_CONF.items():
builder = builder.config(k, v)
spark = builder.getOrCreate()
return spark
def read_sources(spark, csv_paths, parquet_paths):
dfs = []
if csv_paths:
df_csv = (
spark.read
.option("header", True)
.option("multiLine", False)
.option("escape", '"')
.option("quote", '"')
.option("mode", "PERMISSIVE")
.csv(csv_paths)
)
dfs.append(df_csv)
if parquet_paths:
df_parq = spark.read.parquet(parquet_paths)
dfs.append(df_parq)
if not dfs:
raise ValueError("未提供有效的 CSV/Parquet 路径")
# 对齐列(可能存在来源列差异)
cols_all = set()
for d in dfs:
cols_all |= set(d.columns)
cols_all = list(cols_all)
dfs_aligned = [d.select([F.col(c) if c in d.columns else F.lit(None).alias(c) for c in cols_all]) for d in dfs]
df = dfs_aligned[0]
for d in dfs_aligned[1:]:
df = df.unionByName(d)
return df
def normalize_colnames(df):
# 统一列名为小写+下划线
for c in df.columns:
df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))
return df
def parse_timestamps(df):
# 将 create_time/pay_time 解析为 TimestampType 并统一为 Asia/Shanghai
# 兼容 ISO8601(含Z/偏移)、"yyyy-MM-dd HH:mm:ss"、其他可解析格式
def to_ts(colname):
c = F.col(colname)
return F.coalesce(
F.to_timestamp(c, "yyyy-MM-dd'T'HH:mm:ssXXX"),
F.to_timestamp(c, "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
F.to_timestamp(c, "yyyy-MM-dd HH:mm:ss"),
F.to_timestamp(c) # 尝试默认
)
if "create_time" in df.columns:
df = df.withColumn("create_time_ts", to_ts("create_time"))
else:
df = df.withColumn("create_time_ts", F.lit(None).cast(T.TimestampType()))
if "pay_time" in df.columns:
df = df.withColumn("pay_time_ts", to_ts("pay_time"))
else:
df = df.withColumn("pay_time_ts", F.lit(None).cast(T.TimestampType()))
# 缺失时区或无法解析的,保持为空;下游用 create_time_ts 兜底
# 生成本地日期字段(用于汇率匹配与分区)
df = df.withColumn("event_time_ts", F.coalesce(F.col("pay_time_ts"), F.col("create_time_ts")))
df = df.withColumn("event_date", F.to_date("event_time_ts"))
df = df.withColumn("event_month", F.date_format("event_time_ts", "yyyy-MM"))
return df
def load_fx_rates(spark, fx_path):
# 需提供包含每日汇率的表,字段示例:currency, rate_to_cny, date
fx_df = (
spark.read.option("header", True).csv(fx_path)
.select(
F.lower(F.col("currency")).alias("currency"),
F.col("rate_to_cny").cast(T.DoubleType()).alias("rate_to_cny"),
F.to_date("date").alias("fx_date"),
)
.dropna(subset=["currency", "rate_to_cny", "fx_date"])
)
return fx_df
def unify_currency(df, fx_df):
# 标准化 currency 字段
df = df.withColumn("currency_norm", F.lower(F.trim(F.col("currency"))))
# 仅支持 CNY/EUR/JPY,其他标记为未知
df = df.withColumn(
"currency_norm",
F.when(F.col("currency_norm").isin("cny", "eur", "jpy"), F.col("currency_norm")).otherwise(F.lit("unknown"))
)
# 以 event_date 匹配汇率,默认按支付日
df = df.withColumn("fx_join_date", F.col("event_date"))
df = df.join(
fx_df,
on=[df.currency_norm == fx_df.currency, df.fx_join_date == fx_df.fx_date],
how="left"
).drop(fx_df.currency).drop("fx_date")
# 若为 CNY 且无汇率,则默认为 1.0;其他币种无汇率则标记缺失
df = df.withColumn(
"rate_to_cny",
F.when((F.col("currency_norm") == "cny") & F.col("rate_to_cny").isNull(), F.lit(1.0))
.otherwise(F.col("rate_to_cny"))
)
# 将金额列换算为 CNY
for c in CURRENCY_COLS:
if c in df.columns:
df = df.withColumn(f"{c}_cny",
F.when(F.col("rate_to_cny").isNotNull(), F.col(c).cast("double") * F.col("rate_to_cny"))
.otherwise(F.lit(None).cast("double")))
return df
def normalize_categories(df, spark, cat_alias_path):
# 统一大小写、空白;引入别名词典进行规范化(可按层级)
for c in ["category_lvl1", "category_lvl2", "category_lvl3"]:
if c in df.columns:
df = df.withColumn(c, F.trim(df[c]))
df = df.withColumn(c, F.when(F.col(c).isNull() | (F.length(F.col(c)) == 0), F.lit(None)).otherwise(F.col(c)))
df = df.withColumn(c, F.regexp_replace(F.col(c), r"\s+", " "))
# 加载别名映射(YAML),示例结构:
# {
# "lvl1": {"men": "Men", "male": "Men", "男装": "Men"},
# "lvl2": {"t-shirt": "T-Shirt", "tee": "T-Shirt"},
# "lvl3": {"iphone cover":"Phone Case"}
# }
if cat_alias_path:
with open(cat_alias_path, "r", encoding="utf-8") as f:
alias = yaml.safe_load(f) or {}
for level, mapping in alias.items():
if level.lower() in ("lvl1", "lvl2", "lvl3"):
colname = f"category_{level.lower().replace('lvl', 'lvl')}"
if colname in df.columns and isinstance(mapping, dict) and mapping:
# 构建广播映射
mapping_norm = {k.strip().lower(): v for k, v in mapping.items()}
bmap = spark.sparkContext.broadcast(mapping_norm)
@F.udf("string")
def map_alias(x):
if x is None:
return None
key = x.strip().lower()
return bmap.value.get(key, x)
df = df.withColumn(colname, map_alias(F.col(colname)))
# 填充缺失
for c in ["category_lvl1", "category_lvl2", "category_lvl3"]:
if c in df.columns:
df = df.withColumn(c, F.when(F.col(c).isNull(), F.lit("未知")).otherwise(F.col(c)))
return df
def normalize_regions(df, spark, region_map_path):
# 标准化省市与门店:拆分出门店名(若混杂),并映射行政区标准名
# region_mapping.csv 字段示例:raw_province, raw_city, province_std, city_std
if region_map_path:
map_df = (
spark.read.option("header", True).csv(region_map_path)
.select(
F.trim(F.col("raw_province")).alias("raw_province"),
F.trim(F.col("raw_city")).alias("raw_city"),
F.col("province_std"),
F.col("city_std")
)
.dropna(subset=["raw_province", "province_std"])
)
# 提取疑似门店:若 city/province 出现明显门店标识(例:'门店','店','store','shop')
store_regex = r"(?i).*(门店|专卖店|旗舰店|直营店|store|shop).*"
df = df.withColumn("store_name",
F.when(F.col("city").rlike(store_regex), F.col("city"))
.when(F.col("province").rlike(store_regex), F.col("province"))
.otherwise(F.lit(None)))
df = df.withColumn("province_raw", F.lower(F.trim(F.col("province"))))
df = df.withColumn("city_raw", F.lower(F.trim(F.col("city"))))
map_df = map_df.withColumn("raw_province", F.lower(F.col("raw_province")))
map_df = map_df.withColumn("raw_city", F.lower(F.col("raw_city")))
df = df.join(
map_df,
on=(df.province_raw == map_df.raw_province) & (df.city_raw == map_df.raw_city),
how="left"
).drop("raw_province", "raw_city")
df = df.withColumn("province_std",
F.coalesce(F.col("province_std"), F.col("province")))
df = df.withColumn("city_std",
F.coalesce(F.col("city_std"),
F.when(F.col("store_name").isNotNull(), F.lit("门店")).otherwise(F.col("city"))))
else:
# 无映射表时,最小化清洗:直填未知并提取门店
store_regex = r"(?i).*(门店|专卖店|旗舰店|直营店|store|shop).*"
df = df.withColumn("store_name",
F.when(F.col("city").rlike(store_regex), F.col("city"))
.when(F.col("province").rlike(store_regex), F.col("province"))
.otherwise(F.lit(None)))
df = df.withColumn("province_std", F.when(F.col("province").isNull(), F.lit("未知")).otherwise(F.col("province")))
df = df.withColumn("city_std",
F.when(F.col("city").isNull() & F.col("store_name").isNotNull(), F.lit("门店"))
.when(F.col("city").isNull(), F.lit("未知"))
.otherwise(F.col("city")))
return df
def normalize_categoricals(df):
# payment_channel、invoice_type、coupon_code等口径统一与缺失标签
norm_lower = lambda c: F.lower(F.trim(F.regexp_replace(F.col(c), r"\s+", " ")))
if "payment_channel" in df.columns:
df = df.withColumn("payment_channel_norm", norm_lower("payment_channel"))
mapping = {
"weixin": "WeChatPay", "wechat": "WeChatPay", "wechatpay": "WeChatPay", "wxpay": "WeChatPay",
"alipay": "AliPay", "ali": "AliPay",
"unionpay": "UnionPay", "bankcard": "UnionPay",
"paypal": "PayPal",
}
mapping_expr = F.create_map([F.lit(x) for kv in mapping.items() for x in kv])
df = df.withColumn(
"payment_channel_std",
F.coalesce(mapping_expr.getItem(F.col("payment_channel_norm")), F.col("payment_channel"))
)
else:
df = df.withColumn("payment_channel_std", F.lit("未知"))
for c in ["invoice_type", "coupon_code"]:
if c in df.columns:
df = df.withColumn(c, F.when(F.col(c).isNull() | (F.length(F.col(c)) == 0), F.lit("未知")).otherwise(F.col(c)))
else:
df = df.withColumn(c, F.lit("未知"))
return df
def deduplicate(df):
# 去重策略:
# - 以 (order_id, product_id) 作为明细唯一键(若无 product_id,则退化为 order_id)
# - 保留支付时间最新的记录;若支付时间缺失则用创建时间;再以金额作为次级排序
key_cols = ["order_id", "product_id"] if "product_id" in df.columns else ["order_id"]
df = df.withColumn("dedup_ts", F.coalesce(F.col("pay_time_ts"), F.col("create_time_ts")))
# 将金额(CNY)作为次级排序,避免时间并列无法区分
amount_col = F.col("order_amount_cny") if "order_amount_cny" in df.columns else F.col("order_amount").cast("double")
w = Window.partitionBy(*key_cols).orderBy(F.col("dedup_ts").desc_nulls_last(), amount_col.desc_nulls_last())
df = df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")
return df
def fill_missing_numeric_with_median(df, cols):
# 使用 approxQuantile 计算中位数并填充
medians = {}
for c in cols:
if c in df.columns:
q = df.approxQuantile(c, [0.5], 1e-3)
medians[c] = q[0] if q and q[0] is not None else None
for c, m in medians.items():
if m is not None:
df = df.withColumn(c, F.when(F.col(c).isNull(), F.lit(float(m))).otherwise(F.col(c)))
return df, medians
def robust_scale(df, cols):
# Robust 标准化: (x - median) / IQR;IQR = Q3 - Q1;IQR 过小则退化为 1 以稳定
stats = {}
for c in cols:
if c in df.columns:
q1, med, q3 = df.approxQuantile(c, [0.25, 0.5, 0.75], 1e-3)
iqr = (q3 - q1) if (q1 is not None and q3 is not None) else None
stats[c] = {"median": med, "iqr": iqr}
for c, s in stats.items():
if s["median"] is None:
continue
denom = s["iqr"] if (s["iqr"] is not None and s["iqr"] > 1e-9) else 1.0
df = df.withColumn(f"{c}_rs", (F.col(c).cast("double") - F.lit(float(s["median"]))) / F.lit(float(denom)))
return df, stats
def optional_knn_impute(df, cols, feature_cols, use_knn=False, sample_rows=500000):
# 仅作为可选方案(资源敏感)。默认关闭。
# 逻辑:抽样 -> pandas -> sklearn KNNImputer -> 回写
if not use_knn:
return df
try:
import pandas as pd
from sklearn.impute import KNNImputer
except Exception:
print("未安装 pandas/scikit-learn,跳过 KNN 填充", file=sys.stderr)
return df
# 仅对存在缺失的行抽样(降低成本)
target_null_cond = None
for c in cols:
if c in df.columns:
cond = F.col(c).isNull()
target_null_cond = cond if target_null_cond is None else (target_null_cond | cond)
if target_null_cond is None:
return df
df_missing = df.filter(target_null_cond).limit(sample_rows)
key_cols = ["order_id", "product_id"] if "product_id" in df.columns else ["order_id"]
select_cols = key_cols + list(set(cols + feature_cols))
df_pd = df_missing.select(*[c for c in select_cols if c in df_missing.columns]).toPandas()
# 若样本过小或无缺失,直接返回
if df_pd.empty:
return df
# KNN 拟合
imputer = KNNImputer(n_neighbors=5, weights="uniform")
fit_cols = [c for c in cols + feature_cols if c in df_pd.columns]
# 数值化类别特征(如有)以供 KNN 使用(简单编码,生产建议独热或目标编码)
for c in fit_cols:
if df_pd[c].dtype == object:
df_pd[c] = df_pd[c].astype("category").cat.codes
imputed = imputer.fit_transform(df_pd[fit_cols])
df_pd_imputed = df_pd.copy()
for idx, c in enumerate(fit_cols):
if c in cols: # 仅回写目标列
df_pd_imputed[c] = imputed[:, idx]
# 回写(左连接更新)
spark = df.sql_ctx.sparkSession
imputed_sdf = spark.createDataFrame(df_pd_imputed[key_cols + cols])
cond = [df[k] == imputed_sdf[k] for k in key_cols]
for c in cols:
df = df.join(imputed_sdf.select(*key_cols, c).withColumnRenamed(c, f"{c}__imputed"),
on=cond, how="left").drop(*[k for k in key_cols[1:]]) # 避免重复键
df = df.withColumn(c, F.coalesce(F.col(f"{c}__imputed"), F.col(c))).drop(f"{c}__imputed")
return df
def cast_numeric(df):
for c in NUMERIC_COLS_BASE:
if c in df.columns:
df = df.withColumn(c, F.col(c).cast("double"))
return df
def finalize_columns(df):
# 输出字段组织:ID、时间、地区、类别、支付、金额(原始与CNY与标准化)、数量(原始与标准化)
out_cols = []
for c in ["order_id", "user_id", "product_id"]:
if c in df.columns: out_cols.append(c)
out_cols += [
"create_time_ts", "pay_time_ts", "event_time_ts", "event_date", "event_month",
"province_std", "city_std", "store_name",
"category_lvl1", "category_lvl2", "category_lvl3",
"payment_channel_std", "invoice_type", "coupon_code",
"currency_norm", "rate_to_cny"
]
for c in ["quantity", "unit_price", "discount_amount", "order_amount"]:
if f"{c}_cny" in df.columns:
out_cols.extend([c, f"{c}_cny"])
elif c in df.columns:
out_cols.append(c)
if f"{c}_rs" in df.columns:
out_cols.append(f"{c}_rs")
# 去除不存在列与去重
out_cols = [c for c in out_cols if c in df.columns]
df = df.select(*out_cols)
return df
# ----------------------------
# 主流程
# ----------------------------
def main(args):
spark = start_spark()
df = read_sources(spark, args.csv_paths, args.parquet_paths)
df = normalize_colnames(df)
# 基础类型处理
df = cast_numeric(df)
df = parse_timestamps(df)
# 类别清洗与统一
df = normalize_categories(df, spark, args.cat_alias_path)
df = normalize_regions(df, spark, args.region_map_path)
df = normalize_categoricals(df)
# 货币统一
fx_df = load_fx_rates(spark, args.fx_path)
df = unify_currency(df, fx_df)
# 缺失值处理:数值型 -> 中位数;类别型 -> "未知"
# 数值列扩展为使用 CNY 列做标准化来源
numeric_cols = ["quantity"]
for c in CURRENCY_COLS:
if f"{c}_cny" in df.columns:
numeric_cols.append(f"{c}_cny")
elif c in df.columns:
numeric_cols.append(c)
# 先做 KNN(可选,默认关)
# 注意:KNN成本较高,仅对少量缺失行抽样回写;生产建议针对高价值字段与小缺失率列开启
df = optional_knn_impute(
df,
cols=[c for c in numeric_cols if c in df.columns],
feature_cols=["province_std", "city_std", "category_lvl1", "category_lvl2", "category_lvl3"],
use_knn=args.use_knn_impute,
sample_rows=args.knn_sample_rows
)
# 中位数填充(稳妥,覆盖所有数值列)
df, medians = fill_missing_numeric_with_median(df, [c for c in numeric_cols if c in df.columns])
# 去重(按订单/商品保留最新支付记录)
df = deduplicate(df)
# 稳健标准化(Robust)
df, stats = robust_scale(df, [c for c in numeric_cols if c in df.columns])
# 最终列与写出
df = finalize_columns(df)
(
df.write
.mode("overwrite")
.partitionBy("event_month")
.parquet(args.output_path)
)
# 产出指标日志(中位数/IQR等,便于审计)
stats_rows = []
for c, s in stats.items():
stats_rows.append((c, float(s.get("median")) if s.get("median") is not None else None,
float(s.get("iqr")) if s.get("iqr") is not None else None))
if stats_rows:
stats_df = spark.createDataFrame(stats_rows, schema=["column", "median", "iqr"])
stats_df.coalesce(1).write.mode("overwrite").option("header", True).csv(args.output_path + "_stats")
print("清洗与标准化完成:", args.output_path)
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--csv_paths", type=str, nargs="*", default=[], help="CSV 路径(可多)")
parser.add_argument("--parquet_paths", type=str, nargs="*", default=[], help="Parquet 路径(可多)")
parser.add_argument("--fx_path", type=str, required=True, help="汇率表路径(CSV),字段:currency,rate_to_cny,date")
parser.add_argument("--cat_alias_path", type=str, default=None, help="类别别名映射(YAML)")
parser.add_argument("--region_map_path", type=str, default=None, help="行政区映射表(CSV)")
parser.add_argument("--output_path", type=str, required=True, help="输出 Parquet 目录")
parser.add_argument("--use_knn_impute", type=lambda x: str(x).lower() == "true", default=False, help="是否启用KNN填充")
parser.add_argument("--knn_sample_rows", type=int, default=500000, help="KNN抽样上限")
args = parser.parse_args()
main(args)
实现细节与关键点说明
时区统一
去重策略
多币种与汇率
缺失值处理
稳健标准化(Robust)
类别与地区规范
输出
注意事项与可扩展点
如需我根据您的具体映射表样例(类别别名/行政区/汇率样本)进一步补全脚本与单元测试,请提供样例文件结构。
以下提供一份端到端的 Python 脚本,用于对“多渠道广告投放小时级日志”进行标准化处理:统一时区与命名、地区粒度对齐、汇率归一化、异常点击/转化处理、构造派生指标(CTR/CVR/CPC 等)、按 Min-Max 方法将数值缩放至[0,1]、并输出清洗前后分布对比图。脚本支持可配置的缺失值策略与可选的按小时插值补齐。
说明要点:
依赖项:
脚本数据输入/输出约定:
#!/usr/bin/env python3
import os import json import argparse import warnings from typing import List, Dict, Optional
import numpy as np import pandas as pd from dateutil import parser as dtparser import pytz from sklearn.preprocessing import MinMaxScaler import matplotlib.pyplot as plt import seaborn as sns
warnings.filterwarnings("ignore")
def ensure_dir(path: str): os.makedirs(path, exist_ok=True)
def load_dataframe(path: str, dtype_overrides: Optional[Dict[str, str]] = None) -> pd.DataFrame: ext = os.path.splitext(path)[-1].lower() if ext in [".parquet", ".pq"]: df = pd.read_parquet(path) elif ext in [".csv", ".txt"]: df = pd.read_csv(path, low_memory=False) else: raise ValueError(f"Unsupported file extension for: {path}") if dtype_overrides: for col, dt in dtype_overrides.items(): if col in df.columns: try: df[col] = df[col].astype(dt) except Exception: pass return df
def parse_timestamp(ts): # 解析时间戳(可能包含或不包含时区) if pd.isna(ts): return pd.NaT try: return dtparser.parse(str(ts)) except Exception: return pd.NaT
def convert_timezone(series_ts: pd.Series, channel: pd.Series, channel_tz_map: Dict[str, str], target_tz: str) -> pd.Series: """ 将 timestamp 列按 channel_tz_map 解释为对应时区,再统一转换到 target_tz。 - 若 timestamp 已带时区,则按原时区转换到 target_tz。 - 若不带时区,且 channel 在映射中,则视为该时区;否则默认 UTC。 """ target = pytz.timezone(target_tz)
def _convert(ts, ch):
if pd.isna(ts):
return pd.NaT
# 已有 tzinfo
if getattr(ts, "tzinfo", None) is not None:
try:
return ts.astimezone(target)
except Exception:
return pd.NaT
# 无 tzinfo,依据 channel 映射假定原时区
src_tz_name = channel_tz_map.get(str(ch), "UTC")
try:
src_tz = pytz.timezone(src_tz_name)
except Exception:
src_tz = pytz.UTC
try:
localized = src_tz.localize(ts)
return localized.astimezone(target)
except Exception:
return pd.NaT
return pd.to_datetime([_convert(t, c) for t, c in zip(series_ts, channel)], errors="coerce")
def normalize_channels(series: pd.Series, channel_map: Dict[str, str]) -> pd.Series: def _norm(x): if pd.isna(x): return "未知" key = str(x).strip().lower() # 尝试映射(大小写、去空格处理) for k, v in channel_map.items(): if key == str(k).strip().lower(): return v return x # 未覆盖的保留原值 return series.apply(_norm)
def map_regions(df: pd.DataFrame, region_lookup: pd.DataFrame, raw_col: str = "region", std_col: str = "std_region") -> pd.DataFrame: if raw_col not in df.columns: return df if std_col not in region_lookup.columns: raise ValueError(f"region_lookup 必须包含列: {std_col}") region_lookup = region_lookup[[raw_col, std_col]].drop_duplicates() df = df.merge(region_lookup, how="left", on=raw_col) df[std_col] = df[std_col].fillna("未知") return df
def convert_currency(df: pd.DataFrame, fx_df: pd.DataFrame, base_currency: str, date_col: str = "date") -> pd.DataFrame: """ 依据 currency + date 合并汇率,将 cost/revenue 转换为 base 货币。 fx_df 需要列: date(YYYY-MM-DD), currency, rate_to_base(1单位外币 -> base 的倍数) """ if "currency" not in df.columns: raise ValueError("缺少 currency 列") df[date_col] = pd.to_datetime(df["timestamp"].dt.date) fx_df = fx_df.copy() fx_df[date_col] = pd.to_datetime(fx_df[date_col]) fx_df["currency"] = fx_df["currency"].astype(str).str.upper() df["currency"] = df["currency"].astype(str).str.upper() merged = df.merge(fx_df, how="left", left_on=[date_col, "currency"], right_on=[date_col, "currency"]) # rate_to_base 缺失处理:若本身就是 base 货币,则设为 1;否则设为 np.nan 并后续处理 merged["rate_to_base"] = np.where( merged["currency"] == base_currency.upper(), 1.0, merged["rate_to_base"] ) # 对缺失的汇率进行处理(可以选择删除或填充为1,但建议删除以避免误差) missing_fx = merged["rate_to_base"].isna().sum() if missing_fx > 0: # 保守处理:删除缺少汇率的行 merged = merged[~merged["rate_to_base"].isna()] merged["cost_base"] = merged["cost"] * merged["rate_to_base"] merged["revenue_base"] = merged["revenue"] * merged["rate_to_base"] return merged.drop(columns=["rate_to_base"])
def compute_derived_metrics(df: pd.DataFrame) -> pd.DataFrame: # 防止除零 eps = 1e-12 # 派生指标 df["ctr"] = np.where(df["impressions"] > 0, df["clicks"] / df["impressions"], 0.0) df["cvr"] = np.where(df["clicks"] > 0, df["conversions"] / df["clicks"], 0.0) df["cpc"] = np.where(df["clicks"] > 0, df["cost_base"] / df["clicks"], 0.0) df["cpm"] = np.where(df["impressions"] > 0, df["cost_base"] / (df["impressions"] / 1000.0), 0.0) df["cpa"] = np.where(df["conversions"] > 0, df["cost_base"] / np.maximum(df["conversions"], eps), 0.0) df["roas"] = np.where(df["cost_base"] > 0, df["revenue_base"] / df["cost_base"], 0.0) return df
def robust_bot_filter(df: pd.DataFrame, group_cols: List[str], k_mad: float = 5.0, mode: str = "cap") -> pd.DataFrame: """ 使用分组 + MAD 对 CTR/CVR 去极值或删除疑似机器人流量。 - mode = "cap": 将超过上界的 CTR/CVR 截断到上界 - mode = "remove": 删除超过上界的行 同时清理 clicks > impressions,conversions > clicks 等逻辑异常。 """ # 逻辑异常先处理 df = df[df["impressions"] >= 0] df = df[df["clicks"] >= 0] df = df[df["conversions"] >= 0] df = df[df["clicks"] <= df["impressions"]] df = df[df["conversions"] <= df["clicks"]]
# 计算当前 ctr/cvr(若前面未计算)
if "ctr" not in df.columns or "cvr" not in df.columns:
df = compute_derived_metrics(df)
# 分组计算 robust 上界
def _mad_cap(sub: pd.DataFrame, col: str):
x = sub[col].values
med = np.median(x)
mad = np.median(np.abs(x - med)) + 1e-12
ub = med + k_mad * 1.4826 * mad # 正态等效
return ub
# 计算上界表
ub_ctr = df.groupby(group_cols, observed=True).apply(lambda g: _mad_cap(g, "ctr")).rename("ctr_ub")
ub_cvr = df.groupby(group_cols, observed=True).apply(lambda g: _mad_cap(g, "cvr")).rename("cvr_ub")
bounds = pd.concat([ub_ctr, ub_cvr], axis=1).reset_index()
df = df.merge(bounds, how="left", on=group_cols)
if mode == "cap":
df["ctr"] = np.minimum(df["ctr"], df["ctr_ub"].fillna(df["ctr"]))
df["cvr"] = np.minimum(df["cvr"], df["cvr_ub"].fillna(df["cvr"]))
elif mode == "remove":
df = df[(df["ctr"] <= df["ctr_ub"]) | df["ctr_ub"].isna()]
df = df[(df["cvr"] <= df["cvr_ub"]) | df["cvr_ub"].isna()]
else:
raise ValueError("bot_filter mode 必须是 'cap' 或 'remove'")
df = df.drop(columns=["ctr_ub", "cvr_ub"], errors="ignore")
return df
def handle_missing_values(df: pd.DataFrame, cat_cols: List[str], num_cols: List[str], num_strategy: str = "fixed", fixed_value: float = 0.0, interpolate: bool = False, resample_group_cols: Optional[List[str]] = None) -> pd.DataFrame: """ 缺失值策略: - 类别:填充 '未知' - 数值:num_strategy in ['fixed','interpolate','drop'] - fixed: 用 fixed_value 填充 - interpolate: 若 interpolate=True,则对指定分组按小时重采样 + 线性插值 - drop: 删除含缺失的行 """ # 类别填充 for c in cat_cols: if c in df.columns: df[c] = df[c].fillna("未知")
if num_strategy == "drop":
return df.dropna(subset=num_cols)
if num_strategy == "fixed":
for c in num_cols:
if c in df.columns:
df[c] = df[c].fillna(fixed_value)
return df
if num_strategy == "interpolate":
if not interpolate:
# 仅对现有缺失在同一索引上做线性插值(非重采样)
for c in num_cols:
if c in df.columns:
df[c] = df[c].interpolate(method="linear", limit_direction="both")
return df
# 按组重采样到小时,并插值
if resample_group_cols is None or len(resample_group_cols) == 0:
raise ValueError("interpolate=True 时需要 resample_group_cols 指定分组键")
# 仅保留必要列以降低内存
keep_cols = list(set(resample_group_cols + ["timestamp"] + num_cols))
other_cols = [c for c in df.columns if c not in keep_cols]
# 先排序以保证重采样有效
df = df.sort_values(["timestamp"] + resample_group_cols)
# 分组处理
def _resample_group(g: pd.DataFrame):
g = g.set_index("timestamp").asfreq("H")
# 插值
for c in num_cols:
if c in g.columns:
g[c] = g[c].interpolate(method="linear", limit_direction="both")
# 重置索引
g = g.reset_index()
# 恢复分组键
for k in resample_group_cols:
if k not in g.columns:
g[k] = g[k].iloc[0] if len(g) > 0 else np.nan
return g
df_interp = (
df[keep_cols]
.groupby(resample_group_cols, dropna=False, observed=True, group_keys=False)
.apply(_resample_group)
)
# 合并回其他列(其他列不可随时间插值,使用最近值或未知)
df_interp = df_interp.merge(
df[resample_group_cols + ["timestamp"] + other_cols],
how="left",
on=resample_group_cols + ["timestamp"]
)
# 对类别型新增缺失再补一次
for c in other_cols:
if df_interp[c].dtype == "O":
df_interp[c] = df_interp[c].fillna("未知")
return df_interp
raise ValueError("num_strategy 必须是 ['fixed','interpolate','drop'] 之一")
def minmax_scale(df: pd.DataFrame, cols: List[str]) -> (pd.DataFrame, Dict[str, Dict[str, float]]): scaler = MinMaxScaler(feature_range=(0, 1), clip=False) arr = df[cols].astype(float).values scaler.fit(arr) scaled = scaler.transform(arr) df_scaled = df.copy() df_scaled[cols] = scaled # 导出缩放参数 params = {} for i, c in enumerate(cols): params[c] = {"min": float(scaler.data_min_[i]), "max": float(scaler.data_max_[i])} return df_scaled, params
def plot_distributions(sample_before: pd.DataFrame, sample_after: pd.DataFrame, cols: List[str], out_dir: str): ensure_dir(out_dir) sns.set(style="whitegrid") for c in cols: plt.figure(figsize=(8, 5)) sns.kdeplot(sample_before[c].dropna(), label="before", fill=True, alpha=0.3, color="#1f77b4") sns.kdeplot(sample_after[c].dropna(), label="after", fill=True, alpha=0.3, color="#ff7f0e") plt.title(f"Distribution Comparison: {c}") plt.legend() plt.tight_layout() plt.savefig(os.path.join(out_dir, f"dist_{c}.png"), dpi=150) plt.close()
def main(): parser = argparse.ArgumentParser(description="多渠道广告日志标准化(时区/命名/地区/汇率/异常/指标/Min-Max/分布图)") parser.add_argument("--input", required=True, help="输入数据文件(.parquet 或 .csv)") parser.add_argument("--output_dir", required=True, help="输出目录") parser.add_argument("--fx_rates", required=True, help="汇率表文件(.parquet 或 .csv),列:date,currency,rate_to_base") parser.add_argument("--region_lookup", required=True, help="地区映射表文件(.parquet 或 .csv),列:raw_region,std_region") parser.add_argument("--channel_map", required=True, help="渠道命名映射 JSON 文件") parser.add_argument("--channel_tz_map", required=True, help="渠道->时区 映射 JSON 文件(例:'UTC','Asia/Shanghai')") parser.add_argument("--base_currency", default="CNY", help="目标基准货币(例:CNY 或 USD)") parser.add_argument("--target_tz", default="UTC", help="统一目标时区(例:UTC 或 Asia/Shanghai)") parser.add_argument("--num_strategy", default="fixed", choices=["fixed","interpolate","drop"], help="数值缺失策略") parser.add_argument("--fixed_fill_value", type=float, default=0.0, help="数值缺失固定填充值") parser.add_argument("--interpolate_resample", action="store_true", help="插值是否按小时重采样(需指定 resample_group_cols)") parser.add_argument("--resample_group_cols", default="channel,std_region,device_type,os", help="插值重采样分组列,逗号分隔") parser.add_argument("--bot_filter_mode", default="cap", choices=["cap","remove"], help="机器人流量处理方式:截断或删除") parser.add_argument("--bot_k_mad", type=float, default=5.0, help="MAD 上界系数") parser.add_argument("--sample_for_plots", type=int, default=300000, help="绘图抽样行数") parser.add_argument("--save_scaled", action="store_true", help="是否输出缩放后的数据") args = parser.parse_args()
ensure_dir(args.output_dir)
plots_dir = os.path.join(args.output_dir, "plots")
ensure_dir(plots_dir)
# 读取映射配置
with open(args.channel_map, "r", encoding="utf-8") as f:
channel_map = json.load(f)
with open(args.channel_tz_map, "r", encoding="utf-8") as f:
channel_tz_map = json.load(f)
# 读取主数据
dtype_overrides = {
"campaign_id": "string",
"ad_group_id": "string",
"creative_id": "string",
"channel": "string",
"device_type": "string",
"os": "string",
"region": "string",
"currency": "string",
}
df = load_dataframe(args.input, dtype_overrides=dtype_overrides)
# 基本列检查
required_cols = ["channel","timestamp","region","device_type","os",
"impressions","clicks","cost","conversions","revenue","currency"]
missing_cols = [c for c in required_cols if c not in df.columns]
if missing_cols:
raise ValueError(f"缺少必要列: {missing_cols}")
# 1) 时间戳解析与时区统一
df["timestamp"] = df["timestamp"].apply(parse_timestamp)
df["channel"] = normalize_channels(df["channel"], channel_map=channel_map)
df["timestamp"] = convert_timezone(df["timestamp"], df["channel"], channel_tz_map, args.target_tz)
# 舍弃无法解析的时间行
df = df[~df["timestamp"].isna()]
# 对齐到小时起始
df["timestamp"] = df["timestamp"].dt.floor("H")
# 2) 地区映射到标准粒度
region_lookup = load_dataframe(args.region_lookup)
# 兼容不同列名
if "raw_region" not in region_lookup.columns:
region_lookup = region_lookup.rename(columns={"region":"raw_region"})
if "std_region" not in region_lookup.columns:
# 若无 std_region 列,尝试使用 province 列作为标准粒度
if "province" in region_lookup.columns:
region_lookup = region_lookup.rename(columns={"province":"std_region"})
else:
raise ValueError("region_lookup 需要提供 std_region 或 province 列")
df = map_regions(df, region_lookup, raw_col="region", std_col="std_region")
# 3) 汇率统一到 base_currency
fx_df = load_dataframe(args.fx_rates)
# 校验列
for col in ["date","currency","rate_to_base"]:
if col not in fx_df.columns:
raise ValueError("fx_rates 需要列: date,currency,rate_to_base")
df = convert_currency(df, fx_df, base_currency=args.base_currency)
# 4) 基础数值类型安全转换
for c in ["impressions","clicks","conversions"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(np.int64)
for c in ["cost","revenue","cost_base","revenue_base"]:
df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0).astype(float)
# 5) 缺失值处理(类别 -> '未知';数值 -> 固定/插值/删除)
cat_cols = ["campaign_id","ad_group_id","creative_id","channel","device_type","os","region","std_region","currency"]
num_cols = ["impressions","clicks","conversions","cost","revenue","cost_base","revenue_base"]
resample_cols = [c.strip() for c in args.resample_group_cols.split(",") if c.strip()]
df = handle_missing_values(
df, cat_cols=cat_cols, num_cols=num_cols,
num_strategy=args.num_strategy,
fixed_value=args.fixed_fill_value,
interpolate=args.interpolate_resample,
resample_group_cols=resample_cols
)
# 6) 构造派生指标
df = compute_derived_metrics(df)
# 7) 机器人流量过滤(按 channel,std_region,device_type,os 分组)
bot_group_cols = ["channel","std_region","device_type","os"]
df = robust_bot_filter(df, group_cols=bot_group_cols, k_mad=args.bot_k_mad, mode=args.bot_filter_mode)
# 8) 为绘图保留抽样(before/after):
# 为确保对比,before 采样来自“构造派生指标后但机器人过滤前”的版本
# 实际上我们已过滤,因此此处退一步:对过滤后的 df 反映“after”
# 重新构造一个“before”的近似样本:在过滤前的版本进行采样(内存考虑:仅抽样原 df 的子集进行派生指标)
# 为简单与稳定性,这里在过滤后前一步做一个副本(真实业务可在过滤前持久化快照)
# 为保证脚本独立性,这里仅使用 after 的抽样;若严格需要 before,对应生产流程中请在过滤前保存快照。
# 简化处理:我们在过滤前的 df 复制一下(为了给出对比)
# 注意:为避免占用过多内存,上述步骤可在生产写中间文件后再加载采样。
# 此处再计算 before 样本:回退到“构造派生指标后、过滤前”的近似
# 为实现该目的,我们重新从输入到 6) 复制一份较轻流程用于抽样对比。
df_raw = load_dataframe(args.input, dtype_overrides=dtype_overrides)
df_raw["timestamp"] = df_raw["timestamp"].apply(parse_timestamp)
df_raw["channel"] = normalize_channels(df_raw["channel"], channel_map=channel_map)
df_raw["timestamp"] = convert_timezone(df_raw["timestamp"], df_raw["channel"], channel_tz_map, args.target_tz)
df_raw = df_raw[~df_raw["timestamp"].isna()]
df_raw["timestamp"] = df_raw["timestamp"].dt.floor("H")
df_raw = map_regions(df_raw, region_lookup, raw_col="region", std_col="std_region")
df_raw = convert_currency(df_raw, fx_df, base_currency=args.base_currency)
for c in ["impressions","clicks","conversions"]:
df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce").fillna(0).astype(np.int64)
for c in ["cost","revenue","cost_base","revenue_base"]:
df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce").fillna(0.0).astype(float)
df_raw = compute_derived_metrics(df_raw)
# 抽样绘图
n = args.sample_for_plots
rng = np.random.default_rng(2024)
if len(df_raw) > n:
sample_before = df_raw.sample(n=n, random_state=2024)
else:
sample_before = df_raw.copy()
if len(df) > n:
sample_after = df.sample(n=n, random_state=2024)
else:
sample_after = df.copy()
# 9) Min-Max 归一化(缩放到 [0,1])
scale_cols = [
"impressions","clicks","conversions",
"cost_base","revenue_base",
"ctr","cvr","cpc","cpm","cpa","roas"
]
# 缩放前备份用于绘图
# 仅对 after 再缩放
df_scaled, scaler_params = minmax_scale(df, cols=scale_cols)
# 10) 输出
cleaned_path = os.path.join(args.output_dir, "cleaned.parquet")
df.to_parquet(cleaned_path, index=False)
with open(os.path.join(args.output_dir, "scaler.json"), "w", encoding="utf-8") as f:
json.dump({"feature_range": [0,1], "params": scaler_params}, f, ensure_ascii=False, indent=2)
if args.save_scaled:
scaled_path = os.path.join(args.output_dir, "scaled.parquet")
df_scaled.to_parquet(scaled_path, index=False)
# 11) 分布对比图(before vs after)
plot_cols = ["impressions","clicks","cost_base","revenue_base","ctr","cvr","cpc","cpm","cpa","roas"]
plot_distributions(sample_before[plot_cols], sample_after[plot_cols], plot_cols, plots_dir)
print(f"完成。清洗输出: {cleaned_path}")
if args.save_scaled:
print(f"缩放输出: {scaled_path}")
print(f"缩放参数: {os.path.join(args.output_dir, 'scaler.json')}")
print(f"分布图目录: {plots_dir}")
使用示例:
准备文件:
运行(固定值填充,截断机器人流量,输出缩放版):
python standardize_ads.py
--input raw_logs.parquet
--output_dir out_ads
--fx_rates fx_rates.csv
--region_lookup region_lookup.csv
--channel_map channel_map.json
--channel_tz_map channel_tz_map.json
--base_currency CNY
--target_tz Asia/Shanghai
--num_strategy fixed
--fixed_fill_value 0
--bot_filter_mode cap
--bot_k_mad 5
--save_scaled
运行(按小时重采样 + 线性插值补齐数值缺失,删除异常):
python standardize_ads.py
--input raw_logs.parquet
--output_dir out_ads
--fx_rates fx_rates.csv
--region_lookup region_lookup.csv
--channel_map channel_map.json
--channel_tz_map channel_tz_map.json
--base_currency USD
--target_tz UTC
--num_strategy interpolate
--interpolate_resample
--resample_group_cols channel,std_region,device_type,os
--bot_filter_mode remove
--bot_k_mad 6
--save_scaled
实现细节与注意事项:
如需将该流程改为两遍扫描(先统计 min/max 再缩放)以适配严格流式处理,或改用 Dask/Polars 提升吞吐,可在上述框架基础上替换数据载入与 groupby 实现。
以下为一份可在 PySpark 环境运行的端到端标准化脚本,针对描述的制造业秒级传感器数据(约1.2亿行、22列)执行:时区与单位统一、稳健的尖峰检测与修复、分钟级重采样,以及分位数变换至均匀分布。脚本对高频、重尾、块状缺失、重复记录与跨天重启的时间跳变进行了工程化处理,并在规模上可扩展。
说明与假设:
使用方法(示例):
spark-submit --conf spark.sql.adaptive.enabled=true
--conf spark.sql.execution.arrow.pyspark.enabled=true
standardize_pipeline.py
--input s3://bucket/path/input/
--output s3://bucket/path/output/
--input-tz "Asia/Shanghai"
--timestamp-format "yyyy-MM-dd HH:mm:ss"
--quantile-bins 1024
--resample-rule "1min"
--enable-knn false
#!/usr/bin/env python3
import argparse import math from typing import List, Dict, Tuple
import numpy as np import pandas as pd
from pyspark.sql import SparkSession, Window from pyspark.sql import functions as F from pyspark.sql.types import ( StructType, StructField, StringType, TimestampType, DoubleType, LongType ) from pyspark.sql.functions import pandas_udf, PandasUDFType
def parse_args(): p = argparse.ArgumentParser(description="制造业秒级传感器数据标准化管道") p.add_argument("--input", required=True, help="输入路径(支持 parquet/csv)") p.add_argument("--output", required=True, help="输出路径(Parquet,按 device_id 和 ds 分区)") p.add_argument("--input-tz", default="UTC", help="输入时间戳的时区(例如 Asia/Shanghai)") p.add_argument("--timestamp-format", default=None, help="可选:原始时间戳字符串格式(如 yyyy-MM-dd HH:mm:ss)") p.add_argument("--resample-rule", default="1min", help="重采样频率(默认 1min)") p.add_argument("--quantile-bins", type=int, default=1024, help="分位数变换分段数(建议 256~2048)") p.add_argument("--rel-error", type=float, default=1e-3, help="approxQuantile 相对误差") p.add_argument("--enable-knn", type=lambda x: x.lower()=='true', default=False, help="是否启用KNN补齐剩余缺失(默认 false)") return p.parse_args()
def build_spark(): spark = ( SparkSession.builder .appName("manufacturing-sensor-standardization") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .config("spark.sql.shuffle.partitions", "400") .getOrCreate() ) return spark
def map_status():
# 统一状态标签
mapping = {
"normal": "NORMAL", "ok": "NORMAL", "running": "NORMAL", "run": "NORMAL", "on": "NORMAL", "正常": "NORMAL",
"maint": "MAINTENANCE", "maintenance": "MAINTENANCE", "service": "MAINTENANCE", "维护": "MAINTENANCE",
"down": "DOWNTIME", "stop": "DOWNTIME", "stopped": "DOWNTIME", "shutdown": "DOWNTIME", "停机": "DOWNTIME",
"unknown": "UNKNOWN", "na": "UNKNOWN", "n/a": "UNKNOWN", "unk": "UNKNOWN", "": "UNKNOWN", None: "UNKNOWN"
}
# 使用 Spark SQL 表达式完成映射(规避 Python UDF 开销)
def status_expr(col):
c = F.lower(F.coalesce(col.cast(StringType()), F.lit("unknown")))
return F.when(c.isNull(), F.lit("UNKNOWN"))
.when(c.isin(*[F.lit(k) for k in mapping.keys()]), F.expr("CASE " + " ".join(
[f"WHEN lower(coalesce({col._jc.toString()}, 'unknown')) = '{k}' THEN '{v}'" for k, v in mapping.items() if k is not None]
) + " ELSE 'UNKNOWN' END"))
.otherwise(F.lit("UNKNOWN"))
return status_expr
def piecewise_linear_udf_factory(breaks_dict: Dict[str, Tuple[List[float], List[float]]]): # breaks_dict: {col -> (quantile_values, probs)} # 将每列的分位点广播到 executor from pyspark.sql.types import DoubleType import bisect
# 广播用的本地拷贝
local_breaks = {k: (np.array(v[0], dtype=float), np.array(v[1], dtype=float)) for k, v in breaks_dict.items()}
def build_one(colname: str):
qv, pv = local_breaks[colname]
def f_(x):
if x is None or math.isnan(x):
return None
# 左右夹逼
if x <= qv[0]:
return float(pv[0])
if x >= qv[-1]:
return float(pv[-1])
# 查找区间
i = np.searchsorted(qv, x, side='right') - 1
i = max(0, min(i, len(qv) - 2))
ql, qr = qv[i], qv[i+1]
pl, pr = pv[i], pv[i+1]
if qr <= ql + 1e-12:
return float(pl)
u = float(pl + (pr - pl) * (x - ql) / (qr - ql))
# 数值稳定性
return float(min(1.0, max(0.0, u)))
return F.udf(f_, DoubleType())
return build_one
def hampel_filter(series: pd.Series, window: int = 61, n_sigmas: float = 3.5) -> pd.Series: # 以滚动中位数 + MAD 标记异常 if series.size == 0: return series x = series.copy() med = x.rolling(window=window, center=True, min_periods=1).median() mad = (x - med).abs().rolling(window=window, center=True, min_periods=1).median() scale = 1.4826 * mad # 避免除零 scale_safe = scale.replace(0, np.nan) z = (x - med) / scale_safe mask = z.abs() > n_sigmas x[mask] = np.nan return x
def segment_ids(ts: pd.Series, gap_seconds: int = 600) -> pd.Series: # 对时间跳变或大间隔打断,避免插值跨段 dt = ts.diff().dt.total_seconds() # 条件:负跳变或大于阈值 cut = (dt < 0) | (dt > gap_seconds) seg = cut.cumsum().fillna(0).astype(int) return seg
@pandas_udf("device_id string, timestamp timestamp, temp double, vibration double, rpm double, current double, voltage double, pressure double, status string, batch_id string, location string", PandasUDFType.GROUPED_MAP) def clean_and_resample(pdf: pd.DataFrame) -> pd.DataFrame: # 输入:同一 device_id 的秒级数据 # 输出:同一 device_id 的分钟级数据(按中位数/众数聚合),并完成尖峰修复、插值、去重 # 要求输入包含上述 schema 字段;其他字段可在外层 join 或额外处理 if pdf.empty: return pdf
pdf = pdf.sort_values("timestamp").drop_duplicates(subset=["timestamp"], keep="last")
# 分段
seg = segment_ids(pdf["timestamp"])
pdf["__seg__"] = seg
num_cols = ["temp", "vibration", "rpm", "current", "voltage", "pressure"]
cat_cols = ["status", "batch_id", "location"]
# 尖峰过滤 + 分段内时间插值
for c in num_cols:
if c in pdf.columns:
# Hampel 标记异常为 NaN
pdf[c] = hampel_filter(pdf[c], window=61, n_sigmas=3.5)
# 分段内时间插值(不跨段)
pdf[c] = pdf.groupby("__seg__")[c].apply(
lambda s: s.reset_index(drop=True)
).groupby(level=0).apply(
lambda s: s.interpolate(method="linear", limit_area="inside")
).reset_index(level=0, drop=True)
# 双向填充(短缺口修复)
pdf[c] = pdf.groupby("__seg__")[c].apply(lambda s: s.ffill().bfill()).reset_index(level=0, drop=True)
# 类别缺失:段内/设备级众数
for c in cat_cols:
if c in pdf.columns:
# 段内众数
def mode_or_unknown(s):
if s.dropna().empty:
return "UNKNOWN" if c == "status" else None
return s.mode().iloc[0]
# 先段内填,残余再设备级众数
pdf[c] = pdf.groupby("__seg__")[c].apply(lambda s: s.fillna(mode_or_unknown(s)))
if pdf[c].isna().any():
dev_mode = pdf[c].mode().iloc[0] if not pdf[c].dropna().empty else ("UNKNOWN" if c == "status" else None)
pdf[c] = pdf[c].fillna(dev_mode)
# 重采样到分钟级
pdf = pdf.set_index("timestamp")
rule = "1min"
agg_num = {c: "median" for c in num_cols if c in pdf.columns}
# 类别列:按分钟众数
def minute_mode(x):
if x.dropna().empty:
return np.nan
return x.mode().iloc[0]
agg_cat = {c: minute_mode for c in cat_cols if c in pdf.columns}
res = pdf.resample(rule).agg({**agg_num, **agg_cat})
# 恢复列与索引
res = res.reset_index()
# 保留 device_id(所有行为同一设备)
res["device_id"] = pdf["device_id"].iloc[0]
# 列顺序
cols = ["device_id", "timestamp"] + num_cols + cat_cols
res = res[[c for c in cols if c in res.columns]]
# 清理临时列
return res
def main(): args = parse_args() spark = build_spark()
# 读入数据(自动识别 parquet/csv,建议 parquet)
if args.input.lower().endswith(".csv"):
df = spark.read.option("header", True).csv(args.input, inferSchema=True)
else:
df = spark.read.parquet(args.input)
# 必要字段检查
required = ["device_id", "timestamp", "temp", "status"]
missing = [c for c in required if c not in df.columns]
if missing:
raise ValueError(f"缺少必要字段: {missing}")
# 解析与统一时区至 UTC
if args.timestamp_format:
df = df.withColumn("timestamp", F.to_timestamp(F.col("timestamp"), args.timestamp_format))
else:
df = df.withColumn("timestamp", F.to_timestamp(F.col("timestamp")))
# 将本地时间解释为 input-tz,再转为 UTC
df = df.withColumn("timestamp", F.to_utc_timestamp(F.col("timestamp"), args.input_tz))
# 去重(device_id, timestamp)
df = df.dropDuplicates(["device_id", "timestamp"])
# 统一 status 标签
status_expr = map_status()
if "status" in df.columns:
df = df.withColumn("status", status_expr(F.col("status")))
else:
df = df.withColumn("status", F.lit("UNKNOWN"))
# 选择主要列(不存在则跳过)
keep_cols = ["device_id", "timestamp", "temp", "vibration", "rpm", "current", "voltage", "pressure", "status", "batch_id", "location"]
keep_cols = [c for c in keep_cols if c in df.columns]
df = df.select(*keep_cols)
# 设备级温度单位识别(启发式):中位数 > 200 判定为 Kelvin
if "temp" in df.columns:
med = (
df.groupBy("device_id")
.agg(F.expr("percentile_approx(temp, 0.5, 10000) as temp_median"))
)
df = df.join(med, on="device_id", how="left")
df = df.withColumn(
"temp",
F.when(F.col("temp_median") > F.lit(200), F.col("temp") - F.lit(273.15)).otherwise(F.col("temp"))
).drop("temp_median")
# 分设备稳健清洗 + 重采样到分钟级(Pandas GROUPED_MAP UDF)
# 注意:为了 UDF schema 对齐,确保缺失列补齐
for c, t in [("vibration", DoubleType()), ("rpm", DoubleType()), ("current", DoubleType()),
("voltage", DoubleType()), ("pressure", DoubleType()),
("batch_id", StringType()), ("location", StringType())]:
if c not in df.columns:
df = df.withColumn(c, F.lit(None).cast(t))
# 分组调用
cleaned = df.groupBy("device_id").apply(clean_and_resample)
# 残余缺失值处理:数值型字段插值后若仍缺失,按设备/全局中位数填充;类别型按设备/全局众数
num_cols = [c for c in ["temp", "vibration", "rpm", "current", "voltage", "pressure"] if c in cleaned.columns]
cat_cols = [c for c in ["status", "batch_id", "location"] if c in cleaned.columns]
# 设备级中位数
if num_cols:
dev_meds = cleaned.groupBy("device_id").agg(
*[F.expr(f"percentile_approx({c}, 0.5, 10000) as {c}_med") for c in num_cols]
)
cleaned = cleaned.join(dev_meds, on="device_id", how="left")
for c in num_cols:
cleaned = cleaned.withColumn(c, F.when(F.col(c).isNull(), F.col(f"{c}_med")).otherwise(F.col(c))) \
.drop(f"{c}_med")
# 全局中位数兜底
global_meds = cleaned.agg(*[F.expr(f"percentile_approx({c}, 0.5, 10000) as {c}_gmed") for c in num_cols]).collect()[0]
for c in num_cols:
g = global_meds[f"{c}_gmed"]
cleaned = cleaned.na.fill({c: float(g) if g is not None else 0.0})
# 类别众数(设备级 -> 全局)
for c in cat_cols:
dev_mode = (
cleaned.groupBy("device_id", c).count()
.withColumn("rn", F.row_number().over(Window.partitionBy("device_id").orderBy(F.desc("count"))))
.where(F.col("rn") == 1)
.select("device_id", F.col(c).alias(f"{c}_mode"))
)
cleaned = cleaned.join(dev_mode, on="device_id", how="left")
cleaned = cleaned.withColumn(c, F.coalesce(F.col(c), F.col(f"{c}_mode"), F.lit("UNKNOWN" if c=="status" else None))) \
.drop(f"{c}_mode")
# 全局众数兜底
gmode = (cleaned.groupBy(c).count().orderBy(F.desc("count")).limit(1).collect())
if gmode:
gval = gmode[0][0]
cleaned = cleaned.na.fill({c: gval if gval is not None else ("UNKNOWN" if c=="status" else None)})
# 可选:KNN 补齐剩余缺失(谨慎使用,默认关闭)
if args.enable_knn and num_cols:
# 这里提供设备内 KNN 的参考实现(对分钟级小样本较稳妥)
# 对于极大规模与跨设备 KNN,不建议在生产上启用(计算成本高)
@pandas_udf("device_id string, timestamp timestamp, " + ",".join([f"{c} double" for c in num_cols]), PandasUDFType.GROUPED_MAP)
def knn_impute(pdf: pd.DataFrame) -> pd.DataFrame:
from sklearn.impute import KNNImputer
pdf = pdf.sort_values("timestamp")
X = pdf[num_cols].values
imputer = KNNImputer(n_neighbors=3, weights="distance")
X_imputed = imputer.fit_transform(X)
out = pd.DataFrame({"device_id": pdf["device_id"].values, "timestamp": pdf["timestamp"].values})
for i, c in enumerate(num_cols):
out[c] = X_imputed[:, i]
return out
base_cols = ["device_id", "timestamp"] + num_cols
knned = cleaned.select(*[c for c in base_cols if c in cleaned.columns]).groupBy("device_id").apply(knn_impute)
cleaned = cleaned.drop(*num_cols).join(knned, on=["device_id", "timestamp"], how="left")
# 分位数变换至 U(0,1)(Piecewise 线性近似 ECDF)
# 计算每列分位点
breaks = {}
if num_cols:
probs = [i / args.quantile_bins for i in range(args.quantile_bins + 1)]
for c in num_cols:
qvals = cleaned.approxQuantile(c, probs, args.rel_error)
# 去重/单调修正,保证非降
qvals = np.maximum.accumulate(np.array(qvals, dtype=float))
breaks[c] = (qvals.tolist(), probs)
# 为每列构造 UDF 并变换
make_udf = piecewise_linear_udf_factory(breaks)
for c in num_cols:
ufun = make_udf(c)
cleaned = cleaned.withColumn(f"{c}_u", ufun(F.col(c)))
# 分区字段:日期(UTC)
cleaned = cleaned.withColumn("ds", F.to_date(F.col("timestamp")))
# 写出(Parquet,按设备与日期分区)
cleaned.write.mode("overwrite").partitionBy("device_id", "ds").parquet(args.output)
# 记录元数据:分位点、参数等(可选:落盘到同一路径下的 _meta/)
meta_path = args.output.rstrip("/") + "/_meta"
# 保存每列分位点为 JSON(仅示例)
if breaks:
meta_rows = []
for c, (qvals, probs) in breaks.items():
meta_rows.append((c, qvals, probs))
meta_df = spark.createDataFrame(meta_rows, schema="col string, qvals array<double>, probs array<double>")
meta_df.write.mode("overwrite").json(meta_path + "/quantile_breaks")
spark.stop()
实现要点与理由:
性能建议:
帮助数据与业务团队快速把“杂乱原始数据”变成“可直接分析的标准数据”。通过一次精准提问,自动生成贴合你数据特征的Python标准化脚本,覆盖缺失值处理、异常值规整、数值缩放、类别字段统一、时间格式规范等常见清洗环节;同时输出清晰的使用步骤与注意事项。让数据准备时间从数小时缩短到数分钟,提升口径一致性与数据质量,减少对资深工程资源的依赖,加速报表出数与策略验证,最终推动效率提升与更稳的业务决策。
快速生成统一的标准化脚本,批量处理多份数据,缩短探索与建模准备时间,提升结论可靠性。
为报表与仪表盘建立一致的数据口径,自动清洗与编码,减少重复劳动与口径争议,稳定上线节奏。
以可复现脚本记录预处理过程,支持多次实验对照,保障论文与项目数据的一致性与可信度。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期