¥
立即购买

数据标准化脚本生成器

353 浏览
34 试用
7 购买
Dec 18, 2025更新

根据数据集特征和处理需求,自动生成可执行的Python标准化脚本,支持多种标准化方法和缺失值处理策略,提高数据清洗和建模准备效率。

以下为一份可在生产环境运行的PySpark数据标准化脚本,覆盖您给出的业务约束与目标。脚本实现以下要点:

  • 统一时区为 Asia/Shanghai,兼容 UTC/本地/缺时区混杂时间戳。
  • 针对订单补发导致的重复记录进行去重(按订单/商品维度保留最新支付记录)。
  • 兼容多币种(CNY/EUR/JPY),按支付日汇率统一换算为 CNY。
  • 数值字段缺失以中位数填充;类别字段缺失填充为“未知”;可选启用 KNN 模型填充(默认关闭)。
  • 对数量与金额字段进行稳健标准化(Robust 标准化:基于中位数和 IQR)。
  • 类别字段(category)统一口径(大小写/别名),地区字段(省市/自定义门店)规范化。
  • 输出适用于建模与报表的干净数据(保留原始值与标准化值)。

使用说明

  • 建议 Spark 3.3+。
  • 汇率、类别别名、行政区映射等通过外部文件维护,便于多地区、多业务线复用。
  • 默认不启用 KNN 填充,可按需打开,但需注意内存成本。

脚本:standardize_orders.py

  • 运行示例: spark-submit --master yarn --deploy-mode cluster standardize_orders.py
    --csv_paths /data/orders_csv/*.csv
    --parquet_paths /data/orders_parquet/
    --fx_path /ref/fx_rates.csv
    --cat_alias_path /ref/category_alias.yaml
    --region_map_path /ref/region_mapping.csv
    --output_path /clean/orders_cleaned
    --use_knn_impute false

代码如下:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import argparse
import sys
import yaml
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql import types as T

# ----------------------------
# 配置与常量
# ----------------------------
NUMERIC_COLS_BASE = [
    "quantity",
    "unit_price",
    "discount_amount",
    "order_amount",
]
CURRENCY_COLS = ["unit_price", "discount_amount", "order_amount"]

CATEGORICAL_COLS = [
    "category_lvl1",
    "category_lvl2",
    "category_lvl3",
    "province",
    "city",
    "payment_channel",
    "currency",
    "coupon_code",
    "invoice_type",
]

ID_COLS = ["order_id", "user_id", "product_id"]

# 强制设置会话时区为 Asia/Shanghai(关键:统一时区)
SPARK_CONF = {
    "spark.sql.session.timeZone": "Asia/Shanghai",
    "spark.sql.legacy.timeParserPolicy": "LEGACY",  # 兼容多格式解析
    "spark.sql.shuffle.partitions": "400",          # 可按集群规模调优
    "spark.executor.memoryOverhead": "2g"
}

# ----------------------------
# 工具函数
# ----------------------------

def start_spark(app_name="order_standardization"):
    builder = SparkSession.builder.appName(app_name)
    for k, v in SPARK_CONF.items():
        builder = builder.config(k, v)
    spark = builder.getOrCreate()
    return spark

def read_sources(spark, csv_paths, parquet_paths):
    dfs = []
    if csv_paths:
        df_csv = (
            spark.read
                 .option("header", True)
                 .option("multiLine", False)
                 .option("escape", '"')
                 .option("quote", '"')
                 .option("mode", "PERMISSIVE")
                 .csv(csv_paths)
        )
        dfs.append(df_csv)
    if parquet_paths:
        df_parq = spark.read.parquet(parquet_paths)
        dfs.append(df_parq)
    if not dfs:
        raise ValueError("未提供有效的 CSV/Parquet 路径")
    # 对齐列(可能存在来源列差异)
    cols_all = set()
    for d in dfs:
        cols_all |= set(d.columns)
    cols_all = list(cols_all)
    dfs_aligned = [d.select([F.col(c) if c in d.columns else F.lit(None).alias(c) for c in cols_all]) for d in dfs]
    df = dfs_aligned[0]
    for d in dfs_aligned[1:]:
        df = df.unionByName(d)
    return df

def normalize_colnames(df):
    # 统一列名为小写+下划线
    for c in df.columns:
        df = df.withColumnRenamed(c, c.strip().lower().replace(" ", "_"))
    return df

def parse_timestamps(df):
    # 将 create_time/pay_time 解析为 TimestampType 并统一为 Asia/Shanghai
    # 兼容 ISO8601(含Z/偏移)、"yyyy-MM-dd HH:mm:ss"、其他可解析格式
    def to_ts(colname):
        c = F.col(colname)
        return F.coalesce(
            F.to_timestamp(c, "yyyy-MM-dd'T'HH:mm:ssXXX"),
            F.to_timestamp(c, "yyyy-MM-dd'T'HH:mm:ss.SSSXXX"),
            F.to_timestamp(c, "yyyy-MM-dd HH:mm:ss"),
            F.to_timestamp(c)  # 尝试默认
        )
    if "create_time" in df.columns:
        df = df.withColumn("create_time_ts", to_ts("create_time"))
    else:
        df = df.withColumn("create_time_ts", F.lit(None).cast(T.TimestampType()))
    if "pay_time" in df.columns:
        df = df.withColumn("pay_time_ts", to_ts("pay_time"))
    else:
        df = df.withColumn("pay_time_ts", F.lit(None).cast(T.TimestampType()))

    # 缺失时区或无法解析的,保持为空;下游用 create_time_ts 兜底
    # 生成本地日期字段(用于汇率匹配与分区)
    df = df.withColumn("event_time_ts", F.coalesce(F.col("pay_time_ts"), F.col("create_time_ts")))
    df = df.withColumn("event_date", F.to_date("event_time_ts"))
    df = df.withColumn("event_month", F.date_format("event_time_ts", "yyyy-MM"))
    return df

def load_fx_rates(spark, fx_path):
    # 需提供包含每日汇率的表,字段示例:currency, rate_to_cny, date
    fx_df = (
        spark.read.option("header", True).csv(fx_path)
        .select(
            F.lower(F.col("currency")).alias("currency"),
            F.col("rate_to_cny").cast(T.DoubleType()).alias("rate_to_cny"),
            F.to_date("date").alias("fx_date"),
        )
        .dropna(subset=["currency", "rate_to_cny", "fx_date"])
    )
    return fx_df

def unify_currency(df, fx_df):
    # 标准化 currency 字段
    df = df.withColumn("currency_norm", F.lower(F.trim(F.col("currency"))))
    # 仅支持 CNY/EUR/JPY,其他标记为未知
    df = df.withColumn(
        "currency_norm",
        F.when(F.col("currency_norm").isin("cny", "eur", "jpy"), F.col("currency_norm")).otherwise(F.lit("unknown"))
    )

    # 以 event_date 匹配汇率,默认按支付日
    df = df.withColumn("fx_join_date", F.col("event_date"))
    df = df.join(
        fx_df,
        on=[df.currency_norm == fx_df.currency, df.fx_join_date == fx_df.fx_date],
        how="left"
    ).drop(fx_df.currency).drop("fx_date")

    # 若为 CNY 且无汇率,则默认为 1.0;其他币种无汇率则标记缺失
    df = df.withColumn(
        "rate_to_cny",
        F.when((F.col("currency_norm") == "cny") & F.col("rate_to_cny").isNull(), F.lit(1.0))
         .otherwise(F.col("rate_to_cny"))
    )

    # 将金额列换算为 CNY
    for c in CURRENCY_COLS:
        if c in df.columns:
            df = df.withColumn(f"{c}_cny",
                               F.when(F.col("rate_to_cny").isNotNull(), F.col(c).cast("double") * F.col("rate_to_cny"))
                                .otherwise(F.lit(None).cast("double")))
    return df

def normalize_categories(df, spark, cat_alias_path):
    # 统一大小写、空白;引入别名词典进行规范化(可按层级)
    for c in ["category_lvl1", "category_lvl2", "category_lvl3"]:
        if c in df.columns:
            df = df.withColumn(c, F.trim(df[c]))
            df = df.withColumn(c, F.when(F.col(c).isNull() | (F.length(F.col(c)) == 0), F.lit(None)).otherwise(F.col(c)))
            df = df.withColumn(c, F.regexp_replace(F.col(c), r"\s+", " "))
    # 加载别名映射(YAML),示例结构:
    # {
    #   "lvl1": {"men": "Men", "male": "Men", "男装": "Men"},
    #   "lvl2": {"t-shirt": "T-Shirt", "tee": "T-Shirt"},
    #   "lvl3": {"iphone cover":"Phone Case"}
    # }
    if cat_alias_path:
        with open(cat_alias_path, "r", encoding="utf-8") as f:
            alias = yaml.safe_load(f) or {}
        for level, mapping in alias.items():
            if level.lower() in ("lvl1", "lvl2", "lvl3"):
                colname = f"category_{level.lower().replace('lvl', 'lvl')}"
                if colname in df.columns and isinstance(mapping, dict) and mapping:
                    # 构建广播映射
                    mapping_norm = {k.strip().lower(): v for k, v in mapping.items()}
                    bmap = spark.sparkContext.broadcast(mapping_norm)
                    @F.udf("string")
                    def map_alias(x):
                        if x is None:
                            return None
                        key = x.strip().lower()
                        return bmap.value.get(key, x)
                    df = df.withColumn(colname, map_alias(F.col(colname)))
    # 填充缺失
    for c in ["category_lvl1", "category_lvl2", "category_lvl3"]:
        if c in df.columns:
            df = df.withColumn(c, F.when(F.col(c).isNull(), F.lit("未知")).otherwise(F.col(c)))
    return df

def normalize_regions(df, spark, region_map_path):
    # 标准化省市与门店:拆分出门店名(若混杂),并映射行政区标准名
    # region_mapping.csv 字段示例:raw_province, raw_city, province_std, city_std
    if region_map_path:
        map_df = (
            spark.read.option("header", True).csv(region_map_path)
            .select(
                F.trim(F.col("raw_province")).alias("raw_province"),
                F.trim(F.col("raw_city")).alias("raw_city"),
                F.col("province_std"),
                F.col("city_std")
            )
            .dropna(subset=["raw_province", "province_std"])
        )
        # 提取疑似门店:若 city/province 出现明显门店标识(例:'门店','店','store','shop')
        store_regex = r"(?i).*(门店|专卖店|旗舰店|直营店|store|shop).*"
        df = df.withColumn("store_name",
                           F.when(F.col("city").rlike(store_regex), F.col("city"))
                            .when(F.col("province").rlike(store_regex), F.col("province"))
                            .otherwise(F.lit(None)))
        df = df.withColumn("province_raw", F.lower(F.trim(F.col("province"))))
        df = df.withColumn("city_raw", F.lower(F.trim(F.col("city"))))
        map_df = map_df.withColumn("raw_province", F.lower(F.col("raw_province")))
        map_df = map_df.withColumn("raw_city", F.lower(F.col("raw_city")))

        df = df.join(
            map_df,
            on=(df.province_raw == map_df.raw_province) & (df.city_raw == map_df.raw_city),
            how="left"
        ).drop("raw_province", "raw_city")

        df = df.withColumn("province_std",
                           F.coalesce(F.col("province_std"), F.col("province")))
        df = df.withColumn("city_std",
                           F.coalesce(F.col("city_std"),
                                      F.when(F.col("store_name").isNotNull(), F.lit("门店")).otherwise(F.col("city"))))
    else:
        # 无映射表时,最小化清洗:直填未知并提取门店
        store_regex = r"(?i).*(门店|专卖店|旗舰店|直营店|store|shop).*"
        df = df.withColumn("store_name",
                           F.when(F.col("city").rlike(store_regex), F.col("city"))
                            .when(F.col("province").rlike(store_regex), F.col("province"))
                            .otherwise(F.lit(None)))
        df = df.withColumn("province_std", F.when(F.col("province").isNull(), F.lit("未知")).otherwise(F.col("province")))
        df = df.withColumn("city_std",
                           F.when(F.col("city").isNull() & F.col("store_name").isNotNull(), F.lit("门店"))
                            .when(F.col("city").isNull(), F.lit("未知"))
                            .otherwise(F.col("city")))
    return df

def normalize_categoricals(df):
    # payment_channel、invoice_type、coupon_code等口径统一与缺失标签
    norm_lower = lambda c: F.lower(F.trim(F.regexp_replace(F.col(c), r"\s+", " ")))
    if "payment_channel" in df.columns:
        df = df.withColumn("payment_channel_norm", norm_lower("payment_channel"))
        mapping = {
            "weixin": "WeChatPay", "wechat": "WeChatPay", "wechatpay": "WeChatPay", "wxpay": "WeChatPay",
            "alipay": "AliPay", "ali": "AliPay",
            "unionpay": "UnionPay", "bankcard": "UnionPay",
            "paypal": "PayPal",
        }
        mapping_expr = F.create_map([F.lit(x) for kv in mapping.items() for x in kv])
        df = df.withColumn(
            "payment_channel_std",
            F.coalesce(mapping_expr.getItem(F.col("payment_channel_norm")), F.col("payment_channel"))
        )
    else:
        df = df.withColumn("payment_channel_std", F.lit("未知"))

    for c in ["invoice_type", "coupon_code"]:
        if c in df.columns:
            df = df.withColumn(c, F.when(F.col(c).isNull() | (F.length(F.col(c)) == 0), F.lit("未知")).otherwise(F.col(c)))
        else:
            df = df.withColumn(c, F.lit("未知"))
    return df

def deduplicate(df):
    # 去重策略:
    #  - 以 (order_id, product_id) 作为明细唯一键(若无 product_id,则退化为 order_id)
    #  - 保留支付时间最新的记录;若支付时间缺失则用创建时间;再以金额作为次级排序
    key_cols = ["order_id", "product_id"] if "product_id" in df.columns else ["order_id"]
    df = df.withColumn("dedup_ts", F.coalesce(F.col("pay_time_ts"), F.col("create_time_ts")))
    # 将金额(CNY)作为次级排序,避免时间并列无法区分
    amount_col = F.col("order_amount_cny") if "order_amount_cny" in df.columns else F.col("order_amount").cast("double")
    w = Window.partitionBy(*key_cols).orderBy(F.col("dedup_ts").desc_nulls_last(), amount_col.desc_nulls_last())
    df = df.withColumn("rn", F.row_number().over(w)).filter(F.col("rn") == 1).drop("rn")
    return df

def fill_missing_numeric_with_median(df, cols):
    # 使用 approxQuantile 计算中位数并填充
    medians = {}
    for c in cols:
        if c in df.columns:
            q = df.approxQuantile(c, [0.5], 1e-3)
            medians[c] = q[0] if q and q[0] is not None else None
    for c, m in medians.items():
        if m is not None:
            df = df.withColumn(c, F.when(F.col(c).isNull(), F.lit(float(m))).otherwise(F.col(c)))
    return df, medians

def robust_scale(df, cols):
    # Robust 标准化: (x - median) / IQR;IQR = Q3 - Q1;IQR 过小则退化为 1 以稳定
    stats = {}
    for c in cols:
        if c in df.columns:
            q1, med, q3 = df.approxQuantile(c, [0.25, 0.5, 0.75], 1e-3)
            iqr = (q3 - q1) if (q1 is not None and q3 is not None) else None
            stats[c] = {"median": med, "iqr": iqr}
    for c, s in stats.items():
        if s["median"] is None:
            continue
        denom = s["iqr"] if (s["iqr"] is not None and s["iqr"] > 1e-9) else 1.0
        df = df.withColumn(f"{c}_rs", (F.col(c).cast("double") - F.lit(float(s["median"]))) / F.lit(float(denom)))
    return df, stats

def optional_knn_impute(df, cols, feature_cols, use_knn=False, sample_rows=500000):
    # 仅作为可选方案(资源敏感)。默认关闭。
    # 逻辑:抽样 -> pandas -> sklearn KNNImputer -> 回写
    if not use_knn:
        return df
    try:
        import pandas as pd
        from sklearn.impute import KNNImputer
    except Exception:
        print("未安装 pandas/scikit-learn,跳过 KNN 填充", file=sys.stderr)
        return df

    # 仅对存在缺失的行抽样(降低成本)
    target_null_cond = None
    for c in cols:
        if c in df.columns:
            cond = F.col(c).isNull()
            target_null_cond = cond if target_null_cond is None else (target_null_cond | cond)
    if target_null_cond is None:
        return df

    df_missing = df.filter(target_null_cond).limit(sample_rows)
    key_cols = ["order_id", "product_id"] if "product_id" in df.columns else ["order_id"]
    select_cols = key_cols + list(set(cols + feature_cols))
    df_pd = df_missing.select(*[c for c in select_cols if c in df_missing.columns]).toPandas()

    # 若样本过小或无缺失,直接返回
    if df_pd.empty:
        return df

    # KNN 拟合
    imputer = KNNImputer(n_neighbors=5, weights="uniform")
    fit_cols = [c for c in cols + feature_cols if c in df_pd.columns]
    # 数值化类别特征(如有)以供 KNN 使用(简单编码,生产建议独热或目标编码)
    for c in fit_cols:
        if df_pd[c].dtype == object:
            df_pd[c] = df_pd[c].astype("category").cat.codes
    imputed = imputer.fit_transform(df_pd[fit_cols])
    df_pd_imputed = df_pd.copy()
    for idx, c in enumerate(fit_cols):
        if c in cols:  # 仅回写目标列
            df_pd_imputed[c] = imputed[:, idx]

    # 回写(左连接更新)
    spark = df.sql_ctx.sparkSession
    imputed_sdf = spark.createDataFrame(df_pd_imputed[key_cols + cols])
    cond = [df[k] == imputed_sdf[k] for k in key_cols]
    for c in cols:
        df = df.join(imputed_sdf.select(*key_cols, c).withColumnRenamed(c, f"{c}__imputed"),
                     on=cond, how="left").drop(*[k for k in key_cols[1:]])  # 避免重复键
        df = df.withColumn(c, F.coalesce(F.col(f"{c}__imputed"), F.col(c))).drop(f"{c}__imputed")
    return df

def cast_numeric(df):
    for c in NUMERIC_COLS_BASE:
        if c in df.columns:
            df = df.withColumn(c, F.col(c).cast("double"))
    return df

def finalize_columns(df):
    # 输出字段组织:ID、时间、地区、类别、支付、金额(原始与CNY与标准化)、数量(原始与标准化)
    out_cols = []
    for c in ["order_id", "user_id", "product_id"]:
        if c in df.columns: out_cols.append(c)
    out_cols += [
        "create_time_ts", "pay_time_ts", "event_time_ts", "event_date", "event_month",
        "province_std", "city_std", "store_name",
        "category_lvl1", "category_lvl2", "category_lvl3",
        "payment_channel_std", "invoice_type", "coupon_code",
        "currency_norm", "rate_to_cny"
    ]
    for c in ["quantity", "unit_price", "discount_amount", "order_amount"]:
        if f"{c}_cny" in df.columns:
            out_cols.extend([c, f"{c}_cny"])
        elif c in df.columns:
            out_cols.append(c)
        if f"{c}_rs" in df.columns:
            out_cols.append(f"{c}_rs")

    # 去除不存在列与去重
    out_cols = [c for c in out_cols if c in df.columns]
    df = df.select(*out_cols)
    return df

# ----------------------------
# 主流程
# ----------------------------

def main(args):
    spark = start_spark()

    df = read_sources(spark, args.csv_paths, args.parquet_paths)
    df = normalize_colnames(df)

    # 基础类型处理
    df = cast_numeric(df)
    df = parse_timestamps(df)

    # 类别清洗与统一
    df = normalize_categories(df, spark, args.cat_alias_path)
    df = normalize_regions(df, spark, args.region_map_path)
    df = normalize_categoricals(df)

    # 货币统一
    fx_df = load_fx_rates(spark, args.fx_path)
    df = unify_currency(df, fx_df)

    # 缺失值处理:数值型 -> 中位数;类别型 -> "未知"
    # 数值列扩展为使用 CNY 列做标准化来源
    numeric_cols = ["quantity"]
    for c in CURRENCY_COLS:
        if f"{c}_cny" in df.columns:
            numeric_cols.append(f"{c}_cny")
        elif c in df.columns:
            numeric_cols.append(c)

    # 先做 KNN(可选,默认关)
    # 注意:KNN成本较高,仅对少量缺失行抽样回写;生产建议针对高价值字段与小缺失率列开启
    df = optional_knn_impute(
        df,
        cols=[c for c in numeric_cols if c in df.columns],
        feature_cols=["province_std", "city_std", "category_lvl1", "category_lvl2", "category_lvl3"],
        use_knn=args.use_knn_impute,
        sample_rows=args.knn_sample_rows
    )

    # 中位数填充(稳妥,覆盖所有数值列)
    df, medians = fill_missing_numeric_with_median(df, [c for c in numeric_cols if c in df.columns])

    # 去重(按订单/商品保留最新支付记录)
    df = deduplicate(df)

    # 稳健标准化(Robust)
    df, stats = robust_scale(df, [c for c in numeric_cols if c in df.columns])

    # 最终列与写出
    df = finalize_columns(df)

    (
        df.write
          .mode("overwrite")
          .partitionBy("event_month")
          .parquet(args.output_path)
    )

    # 产出指标日志(中位数/IQR等,便于审计)
    stats_rows = []
    for c, s in stats.items():
        stats_rows.append((c, float(s.get("median")) if s.get("median") is not None else None,
                              float(s.get("iqr")) if s.get("iqr") is not None else None))
    if stats_rows:
        stats_df = spark.createDataFrame(stats_rows, schema=["column", "median", "iqr"])
        stats_df.coalesce(1).write.mode("overwrite").option("header", True).csv(args.output_path + "_stats")

    print("清洗与标准化完成:", args.output_path)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("--csv_paths", type=str, nargs="*", default=[], help="CSV 路径(可多)")
    parser.add_argument("--parquet_paths", type=str, nargs="*", default=[], help="Parquet 路径(可多)")
    parser.add_argument("--fx_path", type=str, required=True, help="汇率表路径(CSV),字段:currency,rate_to_cny,date")
    parser.add_argument("--cat_alias_path", type=str, default=None, help="类别别名映射(YAML)")
    parser.add_argument("--region_map_path", type=str, default=None, help="行政区映射表(CSV)")
    parser.add_argument("--output_path", type=str, required=True, help="输出 Parquet 目录")
    parser.add_argument("--use_knn_impute", type=lambda x: str(x).lower() == "true", default=False, help="是否启用KNN填充")
    parser.add_argument("--knn_sample_rows", type=int, default=500000, help="KNN抽样上限")
    args = parser.parse_args()
    main(args)

实现细节与关键点说明

  • 时区统一

    • 通过 spark.sql.session.timeZone=Asia/Shanghai 确保 to_timestamp 对缺失时区的字符串按上海时间解释;对包含 Z/偏移的时间会自动换算为本地显示一致。
    • event_time_ts = coalesce(pay_time_ts, create_time_ts) 作为业务准时间;event_date/event_month用于汇率匹配和分区。
  • 去重策略

    • 明细级唯一键优先用 (order_id, product_id);若无 product_id,退化为 order_id。
    • 排序依据:支付/创建时间最新优先,其次以订单金额(CNY)降序打破平局;可按业务进一步扩展(例如补发标记)。
  • 多币种与汇率

    • currency 统一为小写,非 CNY/EUR/JPY 标记为 unknown。
    • 汇率按 event_date 关联,CNY 无匹配则默认 1.0,其他币种无匹配置空(下游填充/剔除可选)。
    • 所有金额列生成 *_cny,后续标准化基于 CNY 列,保留原始币种与原始金额供审计与报表。
  • 缺失值处理

    • 数值:中位数填充(全量),稳健不受长尾与退款负值影响过大。
    • 类别:空置统一为“未知”,减少建模稀疏与信息丢失。
    • 可选KNN:默认关闭;仅对缺失行抽样执行,避免4M行全量内存压力;生产建议针对高价值字段开启并严格评估资源。
  • 稳健标准化(Robust)

    • 对数量与金额(CNY版本)列计算 Q1/Median/Q3,IQR=Q3-Q1。
    • 标准化公式:(x - median) / IQR;IQR<1e-9 时使用 1 作为分母保证稳定。
    • 保留标准化列后缀 _rs,同时保留原始与CNY列,兼容建模与报表。
  • 类别与地区规范

    • 类别:去空白/合并多空格/大小写统一;通过外部 YAML 别名文件进行标准化,减少硬编码。
    • 地区:通过映射表将原始省市映射为标准行政区名称;检测并抽取门店名称到 store_name(识别到门店时 city_std 设置为“门店”或映射表结果)。
  • 输出

    • 写出 Parquet,按 event_month 分区,便于下游增量消费与报表分区裁剪。
    • 同时输出统计文件(中位数/IQR),便于质量审计与重现实验。

注意事项与可扩展点

  • 汇率精度与来源:建议使用每日对账汇率或交易时汇率,确保 rate_to_cny 与 event_date 对齐;可加入货币最小计价单位(如 JPY 无小数)业务规则。
  • 重复规则:如有补发标记或行版本号(update_time/version),可加入排序维度增强准确性。
  • 异常值策略:当前不剔除负值(退款场景),若需建模剔除可在标准化前对特定场景进行过滤或分层建模。
  • KNN 填充:如需全量分布式KNN,可考虑基于 Faiss/HNSW + UDF 的工程实现,或改用分组(如 product_id 级)回归/均值预测替代。

如需我根据您的具体映射表样例(类别别名/行政区/汇率样本)进一步补全脚本与单元测试,请提供样例文件结构。

以下提供一份端到端的 Python 脚本,用于对“多渠道广告投放小时级日志”进行标准化处理:统一时区与命名、地区粒度对齐、汇率归一化、异常点击/转化处理、构造派生指标(CTR/CVR/CPC 等)、按 Min-Max 方法将数值缩放至[0,1]、并输出清洗前后分布对比图。脚本支持可配置的缺失值策略与可选的按小时插值补齐。

说明要点:

  • 适用数据规模:约 1200 万行、28 列。建议优先使用 Parquet 输入和分类编码以降低内存。绘图仅对抽样数据进行。
  • 缺失值策略可选(类别用“未知”填充;数值可固定值、插值或删除行)。
  • 统一时区需要提供 channel->timezone 映射(JSON);未映射默认按 UTC 处理。
  • 汇率转换需要提供每日汇率表(date,currency,rate_to_base),base 货币可选(例如 CNY 或 USD)。
  • 区域标准化需要提供地区映射表(raw_region -> 标准省级或自定义维度)。
  • 机器人流量过滤采用稳健统计(MAD)对 CTR/CVR 进行分组去极值或删除。

依赖项:

  • pandas, numpy, scikit-learn, matplotlib, seaborn, pytz, python-dateutil, pyarrow

脚本数据输入/输出约定:

  • 输入主数据:包含至少这些列:campaign_id, ad_group_id, creative_id, channel, device_type, os, region, timestamp, impressions, clicks, cost, conversions, revenue, currency。
  • 汇率表:fx_rates.csv(parquet),列:date(YYYY-MM-DD), currency, rate_to_base。
  • 区域表:region_lookup.csv(parquet),列:raw_region, std_region(或 province)、region_level。
  • 渠道命名映射:channel_map.json,形如 {"SEM": "Paid Search", "搜索": "Paid Search", "付费搜索":"Paid Search", "社交":"Paid Social", ...}
  • 渠道时区映射:channel_tz_map.json,形如 {"Paid Search": "UTC", "Paid Social": "Asia/Shanghai", ...}
  • 输出:清洗后的 parquet、缩放后的 parquet、缩放参数 scaler.json、分布对比图 png。

代码(保存为 standardize_ads.py):

#!/usr/bin/env python3

-- coding: utf-8 --

import os import json import argparse import warnings from typing import List, Dict, Optional

import numpy as np import pandas as pd from dateutil import parser as dtparser import pytz from sklearn.preprocessing import MinMaxScaler import matplotlib.pyplot as plt import seaborn as sns

warnings.filterwarnings("ignore")

-------------------------

工具函数

-------------------------

def ensure_dir(path: str): os.makedirs(path, exist_ok=True)

def load_dataframe(path: str, dtype_overrides: Optional[Dict[str, str]] = None) -> pd.DataFrame: ext = os.path.splitext(path)[-1].lower() if ext in [".parquet", ".pq"]: df = pd.read_parquet(path) elif ext in [".csv", ".txt"]: df = pd.read_csv(path, low_memory=False) else: raise ValueError(f"Unsupported file extension for: {path}") if dtype_overrides: for col, dt in dtype_overrides.items(): if col in df.columns: try: df[col] = df[col].astype(dt) except Exception: pass return df

def parse_timestamp(ts): # 解析时间戳(可能包含或不包含时区) if pd.isna(ts): return pd.NaT try: return dtparser.parse(str(ts)) except Exception: return pd.NaT

def convert_timezone(series_ts: pd.Series, channel: pd.Series, channel_tz_map: Dict[str, str], target_tz: str) -> pd.Series: """ 将 timestamp 列按 channel_tz_map 解释为对应时区,再统一转换到 target_tz。 - 若 timestamp 已带时区,则按原时区转换到 target_tz。 - 若不带时区,且 channel 在映射中,则视为该时区;否则默认 UTC。 """ target = pytz.timezone(target_tz)

def _convert(ts, ch):
    if pd.isna(ts):
        return pd.NaT
    # 已有 tzinfo
    if getattr(ts, "tzinfo", None) is not None:
        try:
            return ts.astimezone(target)
        except Exception:
            return pd.NaT
    # 无 tzinfo,依据 channel 映射假定原时区
    src_tz_name = channel_tz_map.get(str(ch), "UTC")
    try:
        src_tz = pytz.timezone(src_tz_name)
    except Exception:
        src_tz = pytz.UTC
    try:
        localized = src_tz.localize(ts)
        return localized.astimezone(target)
    except Exception:
        return pd.NaT

return pd.to_datetime([_convert(t, c) for t, c in zip(series_ts, channel)], errors="coerce")

def normalize_channels(series: pd.Series, channel_map: Dict[str, str]) -> pd.Series: def _norm(x): if pd.isna(x): return "未知" key = str(x).strip().lower() # 尝试映射(大小写、去空格处理) for k, v in channel_map.items(): if key == str(k).strip().lower(): return v return x # 未覆盖的保留原值 return series.apply(_norm)

def map_regions(df: pd.DataFrame, region_lookup: pd.DataFrame, raw_col: str = "region", std_col: str = "std_region") -> pd.DataFrame: if raw_col not in df.columns: return df if std_col not in region_lookup.columns: raise ValueError(f"region_lookup 必须包含列: {std_col}") region_lookup = region_lookup[[raw_col, std_col]].drop_duplicates() df = df.merge(region_lookup, how="left", on=raw_col) df[std_col] = df[std_col].fillna("未知") return df

def convert_currency(df: pd.DataFrame, fx_df: pd.DataFrame, base_currency: str, date_col: str = "date") -> pd.DataFrame: """ 依据 currency + date 合并汇率,将 cost/revenue 转换为 base 货币。 fx_df 需要列: date(YYYY-MM-DD), currency, rate_to_base(1单位外币 -> base 的倍数) """ if "currency" not in df.columns: raise ValueError("缺少 currency 列") df[date_col] = pd.to_datetime(df["timestamp"].dt.date) fx_df = fx_df.copy() fx_df[date_col] = pd.to_datetime(fx_df[date_col]) fx_df["currency"] = fx_df["currency"].astype(str).str.upper() df["currency"] = df["currency"].astype(str).str.upper() merged = df.merge(fx_df, how="left", left_on=[date_col, "currency"], right_on=[date_col, "currency"]) # rate_to_base 缺失处理:若本身就是 base 货币,则设为 1;否则设为 np.nan 并后续处理 merged["rate_to_base"] = np.where( merged["currency"] == base_currency.upper(), 1.0, merged["rate_to_base"] ) # 对缺失的汇率进行处理(可以选择删除或填充为1,但建议删除以避免误差) missing_fx = merged["rate_to_base"].isna().sum() if missing_fx > 0: # 保守处理:删除缺少汇率的行 merged = merged[~merged["rate_to_base"].isna()] merged["cost_base"] = merged["cost"] * merged["rate_to_base"] merged["revenue_base"] = merged["revenue"] * merged["rate_to_base"] return merged.drop(columns=["rate_to_base"])

def compute_derived_metrics(df: pd.DataFrame) -> pd.DataFrame: # 防止除零 eps = 1e-12 # 派生指标 df["ctr"] = np.where(df["impressions"] > 0, df["clicks"] / df["impressions"], 0.0) df["cvr"] = np.where(df["clicks"] > 0, df["conversions"] / df["clicks"], 0.0) df["cpc"] = np.where(df["clicks"] > 0, df["cost_base"] / df["clicks"], 0.0) df["cpm"] = np.where(df["impressions"] > 0, df["cost_base"] / (df["impressions"] / 1000.0), 0.0) df["cpa"] = np.where(df["conversions"] > 0, df["cost_base"] / np.maximum(df["conversions"], eps), 0.0) df["roas"] = np.where(df["cost_base"] > 0, df["revenue_base"] / df["cost_base"], 0.0) return df

def robust_bot_filter(df: pd.DataFrame, group_cols: List[str], k_mad: float = 5.0, mode: str = "cap") -> pd.DataFrame: """ 使用分组 + MAD 对 CTR/CVR 去极值或删除疑似机器人流量。 - mode = "cap": 将超过上界的 CTR/CVR 截断到上界 - mode = "remove": 删除超过上界的行 同时清理 clicks > impressions,conversions > clicks 等逻辑异常。 """ # 逻辑异常先处理 df = df[df["impressions"] >= 0] df = df[df["clicks"] >= 0] df = df[df["conversions"] >= 0] df = df[df["clicks"] <= df["impressions"]] df = df[df["conversions"] <= df["clicks"]]

# 计算当前 ctr/cvr(若前面未计算)
if "ctr" not in df.columns or "cvr" not in df.columns:
    df = compute_derived_metrics(df)

# 分组计算 robust 上界
def _mad_cap(sub: pd.DataFrame, col: str):
    x = sub[col].values
    med = np.median(x)
    mad = np.median(np.abs(x - med)) + 1e-12
    ub = med + k_mad * 1.4826 * mad  # 正态等效
    return ub

# 计算上界表
ub_ctr = df.groupby(group_cols, observed=True).apply(lambda g: _mad_cap(g, "ctr")).rename("ctr_ub")
ub_cvr = df.groupby(group_cols, observed=True).apply(lambda g: _mad_cap(g, "cvr")).rename("cvr_ub")
bounds = pd.concat([ub_ctr, ub_cvr], axis=1).reset_index()

df = df.merge(bounds, how="left", on=group_cols)
if mode == "cap":
    df["ctr"] = np.minimum(df["ctr"], df["ctr_ub"].fillna(df["ctr"]))
    df["cvr"] = np.minimum(df["cvr"], df["cvr_ub"].fillna(df["cvr"]))
elif mode == "remove":
    df = df[(df["ctr"] <= df["ctr_ub"]) | df["ctr_ub"].isna()]
    df = df[(df["cvr"] <= df["cvr_ub"]) | df["cvr_ub"].isna()]
else:
    raise ValueError("bot_filter mode 必须是 'cap' 或 'remove'")

df = df.drop(columns=["ctr_ub", "cvr_ub"], errors="ignore")
return df

def handle_missing_values(df: pd.DataFrame, cat_cols: List[str], num_cols: List[str], num_strategy: str = "fixed", fixed_value: float = 0.0, interpolate: bool = False, resample_group_cols: Optional[List[str]] = None) -> pd.DataFrame: """ 缺失值策略: - 类别:填充 '未知' - 数值:num_strategy in ['fixed','interpolate','drop'] - fixed: 用 fixed_value 填充 - interpolate: 若 interpolate=True,则对指定分组按小时重采样 + 线性插值 - drop: 删除含缺失的行 """ # 类别填充 for c in cat_cols: if c in df.columns: df[c] = df[c].fillna("未知")

if num_strategy == "drop":
    return df.dropna(subset=num_cols)

if num_strategy == "fixed":
    for c in num_cols:
        if c in df.columns:
            df[c] = df[c].fillna(fixed_value)
    return df

if num_strategy == "interpolate":
    if not interpolate:
        # 仅对现有缺失在同一索引上做线性插值(非重采样)
        for c in num_cols:
            if c in df.columns:
                df[c] = df[c].interpolate(method="linear", limit_direction="both")
        return df
    # 按组重采样到小时,并插值
    if resample_group_cols is None or len(resample_group_cols) == 0:
        raise ValueError("interpolate=True 时需要 resample_group_cols 指定分组键")
    # 仅保留必要列以降低内存
    keep_cols = list(set(resample_group_cols + ["timestamp"] + num_cols))
    other_cols = [c for c in df.columns if c not in keep_cols]
    # 先排序以保证重采样有效
    df = df.sort_values(["timestamp"] + resample_group_cols)
    # 分组处理
    def _resample_group(g: pd.DataFrame):
        g = g.set_index("timestamp").asfreq("H")
        # 插值
        for c in num_cols:
            if c in g.columns:
                g[c] = g[c].interpolate(method="linear", limit_direction="both")
        # 重置索引
        g = g.reset_index()
        # 恢复分组键
        for k in resample_group_cols:
            if k not in g.columns:
                g[k] = g[k].iloc[0] if len(g) > 0 else np.nan
        return g

    df_interp = (
        df[keep_cols]
        .groupby(resample_group_cols, dropna=False, observed=True, group_keys=False)
        .apply(_resample_group)
    )
    # 合并回其他列(其他列不可随时间插值,使用最近值或未知)
    df_interp = df_interp.merge(
        df[resample_group_cols + ["timestamp"] + other_cols],
        how="left",
        on=resample_group_cols + ["timestamp"]
    )
    # 对类别型新增缺失再补一次
    for c in other_cols:
        if df_interp[c].dtype == "O":
            df_interp[c] = df_interp[c].fillna("未知")
    return df_interp

raise ValueError("num_strategy 必须是 ['fixed','interpolate','drop'] 之一")

def minmax_scale(df: pd.DataFrame, cols: List[str]) -> (pd.DataFrame, Dict[str, Dict[str, float]]): scaler = MinMaxScaler(feature_range=(0, 1), clip=False) arr = df[cols].astype(float).values scaler.fit(arr) scaled = scaler.transform(arr) df_scaled = df.copy() df_scaled[cols] = scaled # 导出缩放参数 params = {} for i, c in enumerate(cols): params[c] = {"min": float(scaler.data_min_[i]), "max": float(scaler.data_max_[i])} return df_scaled, params

def plot_distributions(sample_before: pd.DataFrame, sample_after: pd.DataFrame, cols: List[str], out_dir: str): ensure_dir(out_dir) sns.set(style="whitegrid") for c in cols: plt.figure(figsize=(8, 5)) sns.kdeplot(sample_before[c].dropna(), label="before", fill=True, alpha=0.3, color="#1f77b4") sns.kdeplot(sample_after[c].dropna(), label="after", fill=True, alpha=0.3, color="#ff7f0e") plt.title(f"Distribution Comparison: {c}") plt.legend() plt.tight_layout() plt.savefig(os.path.join(out_dir, f"dist_{c}.png"), dpi=150) plt.close()

-------------------------

主流程

-------------------------

def main(): parser = argparse.ArgumentParser(description="多渠道广告日志标准化(时区/命名/地区/汇率/异常/指标/Min-Max/分布图)") parser.add_argument("--input", required=True, help="输入数据文件(.parquet 或 .csv)") parser.add_argument("--output_dir", required=True, help="输出目录") parser.add_argument("--fx_rates", required=True, help="汇率表文件(.parquet 或 .csv),列:date,currency,rate_to_base") parser.add_argument("--region_lookup", required=True, help="地区映射表文件(.parquet 或 .csv),列:raw_region,std_region") parser.add_argument("--channel_map", required=True, help="渠道命名映射 JSON 文件") parser.add_argument("--channel_tz_map", required=True, help="渠道->时区 映射 JSON 文件(例:'UTC','Asia/Shanghai')") parser.add_argument("--base_currency", default="CNY", help="目标基准货币(例:CNY 或 USD)") parser.add_argument("--target_tz", default="UTC", help="统一目标时区(例:UTC 或 Asia/Shanghai)") parser.add_argument("--num_strategy", default="fixed", choices=["fixed","interpolate","drop"], help="数值缺失策略") parser.add_argument("--fixed_fill_value", type=float, default=0.0, help="数值缺失固定填充值") parser.add_argument("--interpolate_resample", action="store_true", help="插值是否按小时重采样(需指定 resample_group_cols)") parser.add_argument("--resample_group_cols", default="channel,std_region,device_type,os", help="插值重采样分组列,逗号分隔") parser.add_argument("--bot_filter_mode", default="cap", choices=["cap","remove"], help="机器人流量处理方式:截断或删除") parser.add_argument("--bot_k_mad", type=float, default=5.0, help="MAD 上界系数") parser.add_argument("--sample_for_plots", type=int, default=300000, help="绘图抽样行数") parser.add_argument("--save_scaled", action="store_true", help="是否输出缩放后的数据") args = parser.parse_args()

ensure_dir(args.output_dir)
plots_dir = os.path.join(args.output_dir, "plots")
ensure_dir(plots_dir)

# 读取映射配置
with open(args.channel_map, "r", encoding="utf-8") as f:
    channel_map = json.load(f)
with open(args.channel_tz_map, "r", encoding="utf-8") as f:
    channel_tz_map = json.load(f)

# 读取主数据
dtype_overrides = {
    "campaign_id": "string",
    "ad_group_id": "string",
    "creative_id": "string",
    "channel": "string",
    "device_type": "string",
    "os": "string",
    "region": "string",
    "currency": "string",
}
df = load_dataframe(args.input, dtype_overrides=dtype_overrides)

# 基本列检查
required_cols = ["channel","timestamp","region","device_type","os",
                 "impressions","clicks","cost","conversions","revenue","currency"]
missing_cols = [c for c in required_cols if c not in df.columns]
if missing_cols:
    raise ValueError(f"缺少必要列: {missing_cols}")

# 1) 时间戳解析与时区统一
df["timestamp"] = df["timestamp"].apply(parse_timestamp)
df["channel"] = normalize_channels(df["channel"], channel_map=channel_map)
df["timestamp"] = convert_timezone(df["timestamp"], df["channel"], channel_tz_map, args.target_tz)
# 舍弃无法解析的时间行
df = df[~df["timestamp"].isna()]
# 对齐到小时起始
df["timestamp"] = df["timestamp"].dt.floor("H")

# 2) 地区映射到标准粒度
region_lookup = load_dataframe(args.region_lookup)
# 兼容不同列名
if "raw_region" not in region_lookup.columns:
    region_lookup = region_lookup.rename(columns={"region":"raw_region"})
if "std_region" not in region_lookup.columns:
    # 若无 std_region 列,尝试使用 province 列作为标准粒度
    if "province" in region_lookup.columns:
        region_lookup = region_lookup.rename(columns={"province":"std_region"})
    else:
        raise ValueError("region_lookup 需要提供 std_region 或 province 列")
df = map_regions(df, region_lookup, raw_col="region", std_col="std_region")

# 3) 汇率统一到 base_currency
fx_df = load_dataframe(args.fx_rates)
# 校验列
for col in ["date","currency","rate_to_base"]:
    if col not in fx_df.columns:
        raise ValueError("fx_rates 需要列: date,currency,rate_to_base")
df = convert_currency(df, fx_df, base_currency=args.base_currency)

# 4) 基础数值类型安全转换
for c in ["impressions","clicks","conversions"]:
    df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0).astype(np.int64)
for c in ["cost","revenue","cost_base","revenue_base"]:
    df[c] = pd.to_numeric(df[c], errors="coerce").fillna(0.0).astype(float)

# 5) 缺失值处理(类别 -> '未知';数值 -> 固定/插值/删除)
cat_cols = ["campaign_id","ad_group_id","creative_id","channel","device_type","os","region","std_region","currency"]
num_cols = ["impressions","clicks","conversions","cost","revenue","cost_base","revenue_base"]
resample_cols = [c.strip() for c in args.resample_group_cols.split(",") if c.strip()]
df = handle_missing_values(
    df, cat_cols=cat_cols, num_cols=num_cols,
    num_strategy=args.num_strategy,
    fixed_value=args.fixed_fill_value,
    interpolate=args.interpolate_resample,
    resample_group_cols=resample_cols
)

# 6) 构造派生指标
df = compute_derived_metrics(df)

# 7) 机器人流量过滤(按 channel,std_region,device_type,os 分组)
bot_group_cols = ["channel","std_region","device_type","os"]
df = robust_bot_filter(df, group_cols=bot_group_cols, k_mad=args.bot_k_mad, mode=args.bot_filter_mode)

# 8) 为绘图保留抽样(before/after):
# 为确保对比,before 采样来自“构造派生指标后但机器人过滤前”的版本
# 实际上我们已过滤,因此此处退一步:对过滤后的 df 反映“after”
# 重新构造一个“before”的近似样本:在过滤前的版本进行采样(内存考虑:仅抽样原 df 的子集进行派生指标)
# 为简单与稳定性,这里在过滤后前一步做一个副本(真实业务可在过滤前持久化快照)
# 为保证脚本独立性,这里仅使用 after 的抽样;若严格需要 before,对应生产流程中请在过滤前保存快照。

# 简化处理:我们在过滤前的 df 复制一下(为了给出对比)
# 注意:为避免占用过多内存,上述步骤可在生产写中间文件后再加载采样。
# 此处再计算 before 样本:回退到“构造派生指标后、过滤前”的近似
# 为实现该目的,我们重新从输入到 6) 复制一份较轻流程用于抽样对比。
df_raw = load_dataframe(args.input, dtype_overrides=dtype_overrides)
df_raw["timestamp"] = df_raw["timestamp"].apply(parse_timestamp)
df_raw["channel"] = normalize_channels(df_raw["channel"], channel_map=channel_map)
df_raw["timestamp"] = convert_timezone(df_raw["timestamp"], df_raw["channel"], channel_tz_map, args.target_tz)
df_raw = df_raw[~df_raw["timestamp"].isna()]
df_raw["timestamp"] = df_raw["timestamp"].dt.floor("H")
df_raw = map_regions(df_raw, region_lookup, raw_col="region", std_col="std_region")
df_raw = convert_currency(df_raw, fx_df, base_currency=args.base_currency)
for c in ["impressions","clicks","conversions"]:
    df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce").fillna(0).astype(np.int64)
for c in ["cost","revenue","cost_base","revenue_base"]:
    df_raw[c] = pd.to_numeric(df_raw[c], errors="coerce").fillna(0.0).astype(float)
df_raw = compute_derived_metrics(df_raw)

# 抽样绘图
n = args.sample_for_plots
rng = np.random.default_rng(2024)
if len(df_raw) > n:
    sample_before = df_raw.sample(n=n, random_state=2024)
else:
    sample_before = df_raw.copy()
if len(df) > n:
    sample_after = df.sample(n=n, random_state=2024)
else:
    sample_after = df.copy()

# 9) Min-Max 归一化(缩放到 [0,1])
scale_cols = [
    "impressions","clicks","conversions",
    "cost_base","revenue_base",
    "ctr","cvr","cpc","cpm","cpa","roas"
]
# 缩放前备份用于绘图
# 仅对 after 再缩放
df_scaled, scaler_params = minmax_scale(df, cols=scale_cols)

# 10) 输出
cleaned_path = os.path.join(args.output_dir, "cleaned.parquet")
df.to_parquet(cleaned_path, index=False)
with open(os.path.join(args.output_dir, "scaler.json"), "w", encoding="utf-8") as f:
    json.dump({"feature_range": [0,1], "params": scaler_params}, f, ensure_ascii=False, indent=2)
if args.save_scaled:
    scaled_path = os.path.join(args.output_dir, "scaled.parquet")
    df_scaled.to_parquet(scaled_path, index=False)

# 11) 分布对比图(before vs after)
plot_cols = ["impressions","clicks","cost_base","revenue_base","ctr","cvr","cpc","cpm","cpa","roas"]
plot_distributions(sample_before[plot_cols], sample_after[plot_cols], plot_cols, plots_dir)

print(f"完成。清洗输出: {cleaned_path}")
if args.save_scaled:
    print(f"缩放输出: {scaled_path}")
print(f"缩放参数: {os.path.join(args.output_dir, 'scaler.json')}")
print(f"分布图目录: {plots_dir}")

if name == "main": main()

使用示例:

  • 准备文件:

    • 数据:raw_logs.parquet
    • 汇率:fx_rates.csv,列 [date(YYYY-MM-DD), currency(大写), rate_to_base(浮点)]
    • 地区映射:region_lookup.csv,列 [raw_region, std_region]
    • 渠道映射:channel_map.json(示例) { "SEM": "Paid Search", "搜索": "Paid Search", "付费搜索": "Paid Search", "社交": "Paid Social", "Social": "Paid Social", "展示": "Display", "信息流": "Feed" }
    • 渠道时区映射:channel_tz_map.json(示例;请按真实平台时区填写) { "Paid Search": "UTC", "Paid Social": "UTC", "Display": "UTC", "Feed": "Asia/Shanghai" }
  • 运行(固定值填充,截断机器人流量,输出缩放版): python standardize_ads.py
    --input raw_logs.parquet
    --output_dir out_ads
    --fx_rates fx_rates.csv
    --region_lookup region_lookup.csv
    --channel_map channel_map.json
    --channel_tz_map channel_tz_map.json
    --base_currency CNY
    --target_tz Asia/Shanghai
    --num_strategy fixed
    --fixed_fill_value 0
    --bot_filter_mode cap
    --bot_k_mad 5
    --save_scaled

  • 运行(按小时重采样 + 线性插值补齐数值缺失,删除异常): python standardize_ads.py
    --input raw_logs.parquet
    --output_dir out_ads
    --fx_rates fx_rates.csv
    --region_lookup region_lookup.csv
    --channel_map channel_map.json
    --channel_tz_map channel_tz_map.json
    --base_currency USD
    --target_tz UTC
    --num_strategy interpolate
    --interpolate_resample
    --resample_group_cols channel,std_region,device_type,os
    --bot_filter_mode remove
    --bot_k_mad 6
    --save_scaled

实现细节与注意事项:

  • 时区:脚本将无时区时间戳按 channel_tz_map 解释;带时区时间戳则尊重原 tz 后统一到 target_tz。请确保 channel_tz_map 与真实采集逻辑一致。
  • 渠道命名:normalize_channels 会大小写与空白规范化后匹配映射;未覆盖的名称保持原值,建议逐步完善映射表。
  • 地区粒度:region_lookup 建议标准化到“省级”或业务指定维度;未匹配行填“未知”。
  • 货币:强烈建议对缺失汇率的行删除(脚本采用删除策略),避免错误金额。若需改为填充 1,请在 convert_currency 中调整。
  • 缺失值:类别统一填“未知”;数值可固定值、插值或删除。插值时可选择是否按小时重采样(interpolate_resample),并通过 resample_group_cols 控制分组粒度,避免不必要的笛卡尔扩张。
  • 机器人流量:采用分组(channel,std_region,device_type,os)+ MAD 上界对 CTR/CVR 截断或删除;同时剔除 clicks>impressions、conversions>clicks 等违规记录。k_mad 可根据数据离散度调整。
  • 派生指标:包含 CTR、CVR、CPC、CPM、CPA、ROAS,均做除零保护。
  • Min-Max 缩放:对指定数值列缩放至[0,1];输出 scaler.json 保存每列的 min/max,便于下游与线上对齐。若后续想按训练集统计参数缩放,请在生产流程中先拟定“训练集”,再对验证/测试集应用同一参数。
  • 分布对比图:KDE 曲线基于抽样(默认 30 万行)以保证速度;用于观察清洗效果(异常收敛、长尾被削弱、单位统一后峰位变化等)。
  • 性能建议:优先使用 Parquet;如内存不足,建议:
    • 将插值重采样的分组键减少或按子集批量处理;
    • 先按时间或渠道分片处理再合并;
    • 或改用 Polars/DuckDB 做中间聚合与连接,再回到 pandas 绘图。

如需将该流程改为两遍扫描(先统计 min/max 再缩放)以适配严格流式处理,或改用 Dask/Polars 提升吞吐,可在上述框架基础上替换数据载入与 groupby 实现。

以下为一份可在 PySpark 环境运行的端到端标准化脚本,针对描述的制造业秒级传感器数据(约1.2亿行、22列)执行:时区与单位统一、稳健的尖峰检测与修复、分钟级重采样,以及分位数变换至均匀分布。脚本对高频、重尾、块状缺失、重复记录与跨天重启的时间跳变进行了工程化处理,并在规模上可扩展。

说明与假设:

  • 运行环境:Spark 3.3+(建议开启 Arrow 与 Adaptive Query Execution;集群内存建议 >= 64GB)。
  • 输入数据至少包含字段:device_id、timestamp、temp、vibration、rpm、current、voltage、pressure、status、batch_id、location。
  • 时间戳原始时区通过参数指定(例如“Asia/Shanghai”),统一转换至 UTC。
  • 温度单位不一致:按“设备级中位数 > 200 判定为 Kelvin”,并统一转换到摄氏度(C = K − 273.15)。如有权威元数据,建议以元数据映射替代启发式。
  • 尖峰检测采用 Hampel(滚动中位数与 MAD),修复用时间插值;插值不跨越重启/大间隔段。
  • 重采样至分钟级:数值列用中位数,类别列用分钟内众数。
  • 分位数变换采用近似分位数(approxQuantile)+ 分段线性插值,将数值列映射到 U(0,1)。
  • 缺失值策略:数值列优先用时间插值;类别列用设备级众数;可选启用 KNN 进行剩余缺失的补充(默认关闭,因大规模计算成本较高)。

使用方法(示例): spark-submit --conf spark.sql.adaptive.enabled=true
--conf spark.sql.execution.arrow.pyspark.enabled=true
standardize_pipeline.py
--input s3://bucket/path/input/
--output s3://bucket/path/output/
--input-tz "Asia/Shanghai"
--timestamp-format "yyyy-MM-dd HH:mm:ss"
--quantile-bins 1024
--resample-rule "1min"
--enable-knn false

代码:standardize_pipeline.py

#!/usr/bin/env python3

-- coding: utf-8 --

import argparse import math from typing import List, Dict, Tuple

import numpy as np import pandas as pd

from pyspark.sql import SparkSession, Window from pyspark.sql import functions as F from pyspark.sql.types import ( StructType, StructField, StringType, TimestampType, DoubleType, LongType ) from pyspark.sql.functions import pandas_udf, PandasUDFType

----------------------------

参数解析

----------------------------

def parse_args(): p = argparse.ArgumentParser(description="制造业秒级传感器数据标准化管道") p.add_argument("--input", required=True, help="输入路径(支持 parquet/csv)") p.add_argument("--output", required=True, help="输出路径(Parquet,按 device_id 和 ds 分区)") p.add_argument("--input-tz", default="UTC", help="输入时间戳的时区(例如 Asia/Shanghai)") p.add_argument("--timestamp-format", default=None, help="可选:原始时间戳字符串格式(如 yyyy-MM-dd HH:mm:ss)") p.add_argument("--resample-rule", default="1min", help="重采样频率(默认 1min)") p.add_argument("--quantile-bins", type=int, default=1024, help="分位数变换分段数(建议 256~2048)") p.add_argument("--rel-error", type=float, default=1e-3, help="approxQuantile 相对误差") p.add_argument("--enable-knn", type=lambda x: x.lower()=='true', default=False, help="是否启用KNN补齐剩余缺失(默认 false)") return p.parse_args()

----------------------------

Spark 会话

----------------------------

def build_spark(): spark = ( SparkSession.builder .appName("manufacturing-sensor-standardization") .config("spark.sql.adaptive.enabled", "true") .config("spark.sql.execution.arrow.pyspark.enabled", "true") .config("spark.sql.shuffle.partitions", "400") .getOrCreate() ) return spark

----------------------------

工具函数

----------------------------

def map_status(): # 统一状态标签 mapping = { "normal": "NORMAL", "ok": "NORMAL", "running": "NORMAL", "run": "NORMAL", "on": "NORMAL", "正常": "NORMAL", "maint": "MAINTENANCE", "maintenance": "MAINTENANCE", "service": "MAINTENANCE", "维护": "MAINTENANCE", "down": "DOWNTIME", "stop": "DOWNTIME", "stopped": "DOWNTIME", "shutdown": "DOWNTIME", "停机": "DOWNTIME", "unknown": "UNKNOWN", "na": "UNKNOWN", "n/a": "UNKNOWN", "unk": "UNKNOWN", "": "UNKNOWN", None: "UNKNOWN" } # 使用 Spark SQL 表达式完成映射(规避 Python UDF 开销) def status_expr(col): c = F.lower(F.coalesce(col.cast(StringType()), F.lit("unknown"))) return F.when(c.isNull(), F.lit("UNKNOWN"))
.when(c.isin(*[F.lit(k) for k in mapping.keys()]), F.expr("CASE " + " ".join( [f"WHEN lower(coalesce({col._jc.toString()}, 'unknown')) = '{k}' THEN '{v}'" for k, v in mapping.items() if k is not None] ) + " ELSE 'UNKNOWN' END"))
.otherwise(F.lit("UNKNOWN")) return status_expr

def piecewise_linear_udf_factory(breaks_dict: Dict[str, Tuple[List[float], List[float]]]): # breaks_dict: {col -> (quantile_values, probs)} # 将每列的分位点广播到 executor from pyspark.sql.types import DoubleType import bisect

# 广播用的本地拷贝
local_breaks = {k: (np.array(v[0], dtype=float), np.array(v[1], dtype=float)) for k, v in breaks_dict.items()}

def build_one(colname: str):
    qv, pv = local_breaks[colname]
    def f_(x):
        if x is None or math.isnan(x):
            return None
        # 左右夹逼
        if x <= qv[0]:
            return float(pv[0])
        if x >= qv[-1]:
            return float(pv[-1])
        # 查找区间
        i = np.searchsorted(qv, x, side='right') - 1
        i = max(0, min(i, len(qv) - 2))
        ql, qr = qv[i], qv[i+1]
        pl, pr = pv[i], pv[i+1]
        if qr <= ql + 1e-12:
            return float(pl)
        u = float(pl + (pr - pl) * (x - ql) / (qr - ql))
        # 数值稳定性
        return float(min(1.0, max(0.0, u)))
    return F.udf(f_, DoubleType())
return build_one

----------------------------

设备级稳健清洗 + 重采样(Pandas UDF)

----------------------------

需要在 Pandas UDF 内定义的工具

def hampel_filter(series: pd.Series, window: int = 61, n_sigmas: float = 3.5) -> pd.Series: # 以滚动中位数 + MAD 标记异常 if series.size == 0: return series x = series.copy() med = x.rolling(window=window, center=True, min_periods=1).median() mad = (x - med).abs().rolling(window=window, center=True, min_periods=1).median() scale = 1.4826 * mad # 避免除零 scale_safe = scale.replace(0, np.nan) z = (x - med) / scale_safe mask = z.abs() > n_sigmas x[mask] = np.nan return x

def segment_ids(ts: pd.Series, gap_seconds: int = 600) -> pd.Series: # 对时间跳变或大间隔打断,避免插值跨段 dt = ts.diff().dt.total_seconds() # 条件:负跳变或大于阈值 cut = (dt < 0) | (dt > gap_seconds) seg = cut.cumsum().fillna(0).astype(int) return seg

@pandas_udf("device_id string, timestamp timestamp, temp double, vibration double, rpm double, current double, voltage double, pressure double, status string, batch_id string, location string", PandasUDFType.GROUPED_MAP) def clean_and_resample(pdf: pd.DataFrame) -> pd.DataFrame: # 输入:同一 device_id 的秒级数据 # 输出:同一 device_id 的分钟级数据(按中位数/众数聚合),并完成尖峰修复、插值、去重 # 要求输入包含上述 schema 字段;其他字段可在外层 join 或额外处理 if pdf.empty: return pdf

pdf = pdf.sort_values("timestamp").drop_duplicates(subset=["timestamp"], keep="last")
# 分段
seg = segment_ids(pdf["timestamp"])
pdf["__seg__"] = seg

num_cols = ["temp", "vibration", "rpm", "current", "voltage", "pressure"]
cat_cols = ["status", "batch_id", "location"]

# 尖峰过滤 + 分段内时间插值
for c in num_cols:
    if c in pdf.columns:
        # Hampel 标记异常为 NaN
        pdf[c] = hampel_filter(pdf[c], window=61, n_sigmas=3.5)
        # 分段内时间插值(不跨段)
        pdf[c] = pdf.groupby("__seg__")[c].apply(
            lambda s: s.reset_index(drop=True)
        ).groupby(level=0).apply(
            lambda s: s.interpolate(method="linear", limit_area="inside")
        ).reset_index(level=0, drop=True)
        # 双向填充(短缺口修复)
        pdf[c] = pdf.groupby("__seg__")[c].apply(lambda s: s.ffill().bfill()).reset_index(level=0, drop=True)

# 类别缺失:段内/设备级众数
for c in cat_cols:
    if c in pdf.columns:
        # 段内众数
        def mode_or_unknown(s):
            if s.dropna().empty:
                return "UNKNOWN" if c == "status" else None
            return s.mode().iloc[0]
        # 先段内填,残余再设备级众数
        pdf[c] = pdf.groupby("__seg__")[c].apply(lambda s: s.fillna(mode_or_unknown(s)))
        if pdf[c].isna().any():
            dev_mode = pdf[c].mode().iloc[0] if not pdf[c].dropna().empty else ("UNKNOWN" if c == "status" else None)
            pdf[c] = pdf[c].fillna(dev_mode)

# 重采样到分钟级
pdf = pdf.set_index("timestamp")
rule = "1min"
agg_num = {c: "median" for c in num_cols if c in pdf.columns}
# 类别列:按分钟众数
def minute_mode(x):
    if x.dropna().empty:
        return np.nan
    return x.mode().iloc[0]
agg_cat = {c: minute_mode for c in cat_cols if c in pdf.columns}

res = pdf.resample(rule).agg({**agg_num, **agg_cat})
# 恢复列与索引
res = res.reset_index()
# 保留 device_id(所有行为同一设备)
res["device_id"] = pdf["device_id"].iloc[0]
# 列顺序
cols = ["device_id", "timestamp"] + num_cols + cat_cols
res = res[[c for c in cols if c in res.columns]]

# 清理临时列
return res

----------------------------

主流程

----------------------------

def main(): args = parse_args() spark = build_spark()

# 读入数据(自动识别 parquet/csv,建议 parquet)
if args.input.lower().endswith(".csv"):
    df = spark.read.option("header", True).csv(args.input, inferSchema=True)
else:
    df = spark.read.parquet(args.input)

# 必要字段检查
required = ["device_id", "timestamp", "temp", "status"]
missing = [c for c in required if c not in df.columns]
if missing:
    raise ValueError(f"缺少必要字段: {missing}")

# 解析与统一时区至 UTC
if args.timestamp_format:
    df = df.withColumn("timestamp", F.to_timestamp(F.col("timestamp"), args.timestamp_format))
else:
    df = df.withColumn("timestamp", F.to_timestamp(F.col("timestamp")))
# 将本地时间解释为 input-tz,再转为 UTC
df = df.withColumn("timestamp", F.to_utc_timestamp(F.col("timestamp"), args.input_tz))

# 去重(device_id, timestamp)
df = df.dropDuplicates(["device_id", "timestamp"])

# 统一 status 标签
status_expr = map_status()
if "status" in df.columns:
    df = df.withColumn("status", status_expr(F.col("status")))
else:
    df = df.withColumn("status", F.lit("UNKNOWN"))

# 选择主要列(不存在则跳过)
keep_cols = ["device_id", "timestamp", "temp", "vibration", "rpm", "current", "voltage", "pressure", "status", "batch_id", "location"]
keep_cols = [c for c in keep_cols if c in df.columns]
df = df.select(*keep_cols)

# 设备级温度单位识别(启发式):中位数 > 200 判定为 Kelvin
if "temp" in df.columns:
    med = (
        df.groupBy("device_id")
          .agg(F.expr("percentile_approx(temp, 0.5, 10000) as temp_median"))
    )
    df = df.join(med, on="device_id", how="left")
    df = df.withColumn(
        "temp",
        F.when(F.col("temp_median") > F.lit(200), F.col("temp") - F.lit(273.15)).otherwise(F.col("temp"))
    ).drop("temp_median")

# 分设备稳健清洗 + 重采样到分钟级(Pandas GROUPED_MAP UDF)
# 注意:为了 UDF schema 对齐,确保缺失列补齐
for c, t in [("vibration", DoubleType()), ("rpm", DoubleType()), ("current", DoubleType()),
             ("voltage", DoubleType()), ("pressure", DoubleType()),
             ("batch_id", StringType()), ("location", StringType())]:
    if c not in df.columns:
        df = df.withColumn(c, F.lit(None).cast(t))

# 分组调用
cleaned = df.groupBy("device_id").apply(clean_and_resample)

# 残余缺失值处理:数值型字段插值后若仍缺失,按设备/全局中位数填充;类别型按设备/全局众数
num_cols = [c for c in ["temp", "vibration", "rpm", "current", "voltage", "pressure"] if c in cleaned.columns]
cat_cols = [c for c in ["status", "batch_id", "location"] if c in cleaned.columns]

# 设备级中位数
if num_cols:
    dev_meds = cleaned.groupBy("device_id").agg(
        *[F.expr(f"percentile_approx({c}, 0.5, 10000) as {c}_med") for c in num_cols]
    )
    cleaned = cleaned.join(dev_meds, on="device_id", how="left")
    for c in num_cols:
        cleaned = cleaned.withColumn(c, F.when(F.col(c).isNull(), F.col(f"{c}_med")).otherwise(F.col(c))) \
                         .drop(f"{c}_med")
    # 全局中位数兜底
    global_meds = cleaned.agg(*[F.expr(f"percentile_approx({c}, 0.5, 10000) as {c}_gmed") for c in num_cols]).collect()[0]
    for c in num_cols:
        g = global_meds[f"{c}_gmed"]
        cleaned = cleaned.na.fill({c: float(g) if g is not None else 0.0})

# 类别众数(设备级 -> 全局)
for c in cat_cols:
    dev_mode = (
        cleaned.groupBy("device_id", c).count()
               .withColumn("rn", F.row_number().over(Window.partitionBy("device_id").orderBy(F.desc("count"))))
               .where(F.col("rn") == 1)
               .select("device_id", F.col(c).alias(f"{c}_mode"))
    )
    cleaned = cleaned.join(dev_mode, on="device_id", how="left")
    cleaned = cleaned.withColumn(c, F.coalesce(F.col(c), F.col(f"{c}_mode"), F.lit("UNKNOWN" if c=="status" else None))) \
                     .drop(f"{c}_mode")
    # 全局众数兜底
    gmode = (cleaned.groupBy(c).count().orderBy(F.desc("count")).limit(1).collect())
    if gmode:
        gval = gmode[0][0]
        cleaned = cleaned.na.fill({c: gval if gval is not None else ("UNKNOWN" if c=="status" else None)})

# 可选:KNN 补齐剩余缺失(谨慎使用,默认关闭)
if args.enable_knn and num_cols:
    # 这里提供设备内 KNN 的参考实现(对分钟级小样本较稳妥)
    # 对于极大规模与跨设备 KNN,不建议在生产上启用(计算成本高)
    @pandas_udf("device_id string, timestamp timestamp, " + ",".join([f"{c} double" for c in num_cols]), PandasUDFType.GROUPED_MAP)
    def knn_impute(pdf: pd.DataFrame) -> pd.DataFrame:
        from sklearn.impute import KNNImputer
        pdf = pdf.sort_values("timestamp")
        X = pdf[num_cols].values
        imputer = KNNImputer(n_neighbors=3, weights="distance")
        X_imputed = imputer.fit_transform(X)
        out = pd.DataFrame({"device_id": pdf["device_id"].values, "timestamp": pdf["timestamp"].values})
        for i, c in enumerate(num_cols):
            out[c] = X_imputed[:, i]
        return out

    base_cols = ["device_id", "timestamp"] + num_cols
    knned = cleaned.select(*[c for c in base_cols if c in cleaned.columns]).groupBy("device_id").apply(knn_impute)
    cleaned = cleaned.drop(*num_cols).join(knned, on=["device_id", "timestamp"], how="left")

# 分位数变换至 U(0,1)(Piecewise 线性近似 ECDF)
# 计算每列分位点
breaks = {}
if num_cols:
    probs = [i / args.quantile_bins for i in range(args.quantile_bins + 1)]
    for c in num_cols:
        qvals = cleaned.approxQuantile(c, probs, args.rel_error)
        # 去重/单调修正,保证非降
        qvals = np.maximum.accumulate(np.array(qvals, dtype=float))
        breaks[c] = (qvals.tolist(), probs)

    # 为每列构造 UDF 并变换
    make_udf = piecewise_linear_udf_factory(breaks)
    for c in num_cols:
        ufun = make_udf(c)
        cleaned = cleaned.withColumn(f"{c}_u", ufun(F.col(c)))

# 分区字段:日期(UTC)
cleaned = cleaned.withColumn("ds", F.to_date(F.col("timestamp")))

# 写出(Parquet,按设备与日期分区)
cleaned.write.mode("overwrite").partitionBy("device_id", "ds").parquet(args.output)

# 记录元数据:分位点、参数等(可选:落盘到同一路径下的 _meta/)
meta_path = args.output.rstrip("/") + "/_meta"
# 保存每列分位点为 JSON(仅示例)
if breaks:
    meta_rows = []
    for c, (qvals, probs) in breaks.items():
        meta_rows.append((c, qvals, probs))
    meta_df = spark.createDataFrame(meta_rows, schema="col string, qvals array<double>, probs array<double>")
    meta_df.write.mode("overwrite").json(meta_path + "/quantile_breaks")

spark.stop()

if name == "main": main()

实现要点与理由:

  • 时区统一:使用 to_utc_timestamp,确保所有记录采用一致 UTC 基准。
  • 温度单位统一:设备级中位数阈值启发式(>200 视为 K),全量转换至摄氏度;实际生产建议使用设备元数据映射以避免误判。
  • 尖峰检测:Hampel(滚动中位数+MAD)对重尾与尖峰鲁棒,window=61(约一分钟)能在秒级数据上稳定滤除短脉冲。
  • 插值与分段:通过负跳变或大间隔(>600 秒)切段,避免跨重启/丢包块插值导致虚假趋势;段内先线性插值,再前后向填充修复短缺口。
  • 重采样:分钟级聚合采用中位数抑制残余异常;类别列使用分钟内众数以保留主导状态。
  • 缺失策略:符合要求的三类策略顺序执行。KNN 作为可选补齐,限制在设备级分钟序列上以控制计算规模。
  • 分位数变换:全局近似分位点 + 分段线性插值,得到近似 U(0,1) 的均匀分布表征,兼容线性与非参数模型;bins 建议 256~2048 视数据规模与精度折中。

性能建议:

  • 优先使用 Parquet 输入/输出并按 device_id、ds 分区,便于下游增量处理与裁剪读取。
  • 在集群上调大 shuffle 分区数与并行度;对超大设备数量,可在 groupBy.apply 前先按设备哈希重分区。
  • 若设备数量很大且单设备数据量仍很大,可缩小 Hampel 窗口或将清洗、重采样拆为两步作业。

示例详情

该提示词已被收录:
“AI工程师必备:高效建模与数据处理提示词合集”
覆盖建模到评估关键环节,助你快速构建高性能模型
√ 立即可用 · 零学习成本
√ 参数化批量生成
√ 专业提示词工程师打磨

解决的问题

帮助数据与业务团队快速把“杂乱原始数据”变成“可直接分析的标准数据”。通过一次精准提问,自动生成贴合你数据特征的Python标准化脚本,覆盖缺失值处理、异常值规整、数值缩放、类别字段统一、时间格式规范等常见清洗环节;同时输出清晰的使用步骤与注意事项。让数据准备时间从数小时缩短到数分钟,提升口径一致性与数据质量,减少对资深工程资源的依赖,加速报表出数与策略验证,最终推动效率提升与更稳的业务决策。

适用用户

数据分析师

快速生成统一的标准化脚本,批量处理多份数据,缩短探索与建模准备时间,提升结论可靠性。

商业智能工程师

为报表与仪表盘建立一致的数据口径,自动清洗与编码,减少重复劳动与口径争议,稳定上线节奏。

科研人员

以可复现脚本记录预处理过程,支持多次实验对照,保障论文与项目数据的一致性与可信度。

特征总结

一键生成可执行的标准化脚本,按数据特征自动配置处理步骤与清晰注释。
自动识别数值、类别与时间字段,匹配缩放、编码、清洗策略,减少手动试错。
灵活参数化:缺失值填充、异常处理、编码方式可选,一次设定,多场景复用。
按目标场景推荐方案:模型训练、报表、可视化皆可用,兼顾效果与可解释。
生成校验报告与提示,定位问题列与异常分布,上线前就能发现潜在风险。
支持多语言说明与注释,跨团队共享更顺畅,降低沟通成本与交付时间。
自动输出可复现步骤清单,便于审计、版本对比与知识沉淀,确保一致口径。
与下游建模流程衔接顺畅,数据即刻可用,显著压缩准备与迭代周期。
可自定义模板与参数,适配行业规则与数据口径,标准化策略一键切换。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥25.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 278 tokens
- 5 个可调节参数
{ 数据集特征 } { 标准化方法 } { 缺失值处理策略 } { 输出脚本格式要求 } { 特定字段处理规则 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59