数据验证脚本编写指南

181 浏览
14 试用
4 购买
Sep 27, 2025更新

生成用于验证数据完整性的Python脚本,精准高效。

下面给出一份可直接运行的 Python 脚本,使用 pandas 对“订单表 + 用户表”进行数据完整性验证,并输出问题明细与汇总。覆盖的检查项包括:主键/外键一致性、金额范围、渠道枚举、日期合法与时区归一、缺失与重复值等。

说明

  • 输入:两个 CSV 文件(orders.csv, users.csv),也可在配置区修改路径。
  • 输出:控制台汇总报告 + 多个问题明细 CSV 到 output/ 目录。
  • 合并:对订单左连接用户,检出孤儿订单(外键不匹配)。
  • 时区:将包含时间或时区的时间戳统一归一到 Asia/Shanghai 后取日期;纯日期字符串不做时区偏移。

脚本

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
from datetime import date, datetime, timedelta

import numpy as np
import pandas as pd
import pytz


############################
# 配置
############################
ORDERS_CSV = "orders.csv"   # 订单表CSV路径,要求包含列:order_id, order_date, customer_id, amount, channel, currency
USERS_CSV  = "users.csv"    # 用户表CSV路径,要求包含列:customer_id, signup_date, region
OUTPUT_DIR = "output"       # 问题明细输出目录
TARGET_TZ  = "Asia/Shanghai" # 日期时间归一的目标时区
ALLOWED_CHANNELS = {"web", "app"}  # 渠道枚举

# 日期合理范围(可按需调整)
DATE_MIN = pd.to_datetime("1970-01-01").date()
DATE_MAX = (datetime.now(pytz.timezone(TARGET_TZ)) + timedelta(days=365)).date()  # 未来一年的容忍


############################
# 工具函数
############################
def ensure_output_dir(path: str):
    os.makedirs(path, exist_ok=True)


def read_csv_as_strings(csv_path: str, required_cols: set):
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"未找到文件: {csv_path}")

    # 统一以字符串读取,避免前导零丢失/科学计数等问题;后续再做严格类型校验
    df = pd.read_csv(csv_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "NULL"])
    # 统一列名小写
    df.columns = [c.strip().lower() for c in df.columns]
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"文件 {csv_path} 缺少必需列: {missing}")

    # 去除字符串首尾空白
    for c in df.columns:
        if pd.api.types.is_string_dtype(df[c]):
            df[c] = df[c].str.strip()
    return df


def to_nullable_int(series: pd.Series):
    # 允许空值;验证为整数(禁止小数)
    s = series.copy()
    # 去掉逗号/空格
    s = s.str.replace(",", "", regex=False)
    # 合法整数(可含符号),用正则先行筛查,避免 1.0 之类误判
    mask_nonempty = s.notna() & (s != "")
    mask_intlike = s.str.match(r"^[+-]?\d+$", na=False)
    out = pd.Series(pd.NA, index=s.index, dtype="Int64")
    out.loc[mask_nonempty & mask_intlike] = pd.to_numeric(s[mask_nonempty & mask_intlike], errors="coerce").astype("Int64")
    return out


def to_nullable_decimal(series: pd.Series):
    s = series.copy()
    s = s.str.replace(",", "", regex=False)
    # 允许小数;返回float64并保留NaN
    out = pd.to_numeric(s, errors="coerce")
    return out


def normalize_channel(series: pd.Series):
    s = series.str.lower().str.strip()
    return s


def parse_to_local_date(series: pd.Series, target_tz: str):
    """
    规则:
    - 纯日期(YYYY-MM-DD)直接解析为日期;
    - 含时间戳:
        * 含时区偏移/标记:转为UTC再转换到目标时区,最后取date;
        * 无时区:按目标时区本地化,再取date。
    返回:
    - dates: pd.Series of python date (或 NaT),dtype=object
    - meta: dict,包含计数统计
    """
    tz = pytz.timezone(target_tz)
    raw = series.astype("string").str.strip()

    mask_empty = raw.isna() | (raw == "")
    mask_date_only = raw.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False)
    mask_has_tz = raw.str.contains(r"(Z|[+-]\d{2}:\d{2}|[+-]\d{4}|UTC|GMT)", case=False, na=False)
    mask_timestamp = ~mask_empty & ~mask_date_only

    dates = pd.Series(pd.NaT, index=raw.index, dtype="object")  # 保存为 python date 或 NaT

    # 纯日期
    if mask_date_only.any():
        d = pd.to_datetime(raw[mask_date_only], format="%Y-%m-%d", errors="coerce")
        dates.loc[mask_date_only] = d.dt.date

    # 含时区的时间戳
    mask_ts_tz = mask_timestamp & mask_has_tz
    if mask_ts_tz.any():
        dt_utc = pd.to_datetime(raw[mask_ts_tz], errors="coerce", utc=True)
        dt_local = dt_utc.dt.tz_convert(tz)
        dates.loc[mask_ts_tz] = dt_local.dt.date

    # 无时区的时间戳 -> 本地化到目标时区
    mask_ts_naive = mask_timestamp & (~mask_has_tz)
    if mask_ts_naive.any():
        dt_naive = pd.to_datetime(raw[mask_ts_naive], errors="coerce", utc=False)
        try:
            dt_localized = dt_naive.dt.tz_localize(tz)
        except TypeError:
            # 若 dt_naive 已带tz(极端混合情况),退化处理
            dt_localized = dt_naive
            dt_localized = dt_localized.dt.tz_convert(tz)
        dates.loc[mask_ts_naive] = dt_localized.dt.date

    meta = {
        "empty": int(mask_empty.sum()),
        "date_only": int(mask_date_only.sum()),
        "timestamp_with_tz": int(mask_ts_tz.sum()),
        "timestamp_naive": int(mask_ts_naive.sum()),
        "parsed_na": int(pd.isna(dates).sum())
    }
    return dates, meta


def regex_iso_currency(series: pd.Series):
    # 简单校验货币代码格式(ISO-4217 形态):3位大写字母
    s = series.astype("string")
    return s.str.fullmatch(r"[A-Z]{3}", na=False)


############################
# 验证逻辑
############################
def validate_orders(df_orders: pd.DataFrame):
    issues = {}

    # 1) 主键与必填约束
    df = df_orders.copy()
    df["order_id_int"] = to_nullable_int(df["order_id"])
    issues["order_id_null"] = df[df["order_id_int"].isna()][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]
    # 主键重复
    dup_order = df["order_id_int"].duplicated(keep=False) & df["order_id_int"].notna()
    issues["order_pk_duplicates"] = df[dup_order][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]

    # 2) 非空约束:customer_id
    issues["order_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["order_id", "customer_id"]]

    # 3) 金额 decimal >= 0
    df["amount_num"] = to_nullable_decimal(df["amount"])
    issues["amount_not_numeric"] = df[df["amount"].notna() & (df["amount"] != "") & df["amount_num"].isna()][["order_id", "amount"]]
    issues["amount_negative"] = df[df["amount_num"].notna() & (df["amount_num"] < 0)][["order_id", "amount", "amount_num"]]

    # 4) 渠道枚举
    df["channel_norm"] = normalize_channel(df["channel"])
    issues["channel_null"] = df[df["channel_norm"].isna() | (df["channel_norm"] == "")][["order_id", "channel"]]
    issues["channel_invalid_enum"] = df[df["channel_norm"].notna() & (df["channel_norm"] != "") & (~df["channel_norm"].isin(ALLOWED_CHANNELS))][["order_id", "channel"]]

    # 5) 货币默认/格式(默认CNY:若为空,视为未正确应用默认;非空不限定取值)
    issues["currency_missing_should_default"] = df[df["currency"].isna() | (df["currency"] == "")][["order_id", "currency"]]
    # 可选:检查格式是否符合3位大写(信息级)
    fmt_mask = regex_iso_currency(df["currency"])
    issues["currency_format_non_iso_like_info"] = df[(df["currency"].notna()) & (df["currency"] != "") & (~fmt_mask)][["order_id", "currency"]]

    # 6) 日期合法与时区归一
    order_dates, meta_order = parse_to_local_date(df["order_date"], TARGET_TZ)
    df["order_date_norm"] = order_dates
    issues["order_date_unparseable"] = df[pd.isna(df["order_date_norm"]) & df["order_date"].notna() & (df["order_date"] != "")][["order_id", "order_date"]]
    # 日期范围
    mask_order_range_invalid = (~pd.isna(df["order_date_norm"])) & ((df["order_date_norm"] < DATE_MIN) | (df["order_date_norm"] > DATE_MAX))
    issues["order_date_out_of_range"] = df[mask_order_range_invalid][["order_id", "order_date", "order_date_norm"]]

    # 7) 订单表全行重复(可能来自重复导出)
    issues["order_fullrow_duplicates"] = df[df.duplicated(subset=["order_id", "order_date", "customer_id", "amount", "channel", "currency"], keep=False)]

    return df, issues, {"order_date_meta": meta_order}


def validate_users(df_users: pd.DataFrame):
    issues = {}
    df = df_users.copy()

    # 1) 主键与必填约束
    issues["user_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["customer_id", "signup_date", "region"]]
    dup_user = df["customer_id"].duplicated(keep=False) & df["customer_id"].notna() & (df["customer_id"] != "")
    issues["user_pk_duplicates"] = df[dup_user][["customer_id", "signup_date", "region"]]

    # 2) 日期合法与时区归一
    signup_dates, meta_user = parse_to_local_date(df["signup_date"], TARGET_TZ)
    df["signup_date_norm"] = signup_dates
    issues["signup_date_unparseable"] = df[pd.isna(df["signup_date_norm"]) & df["signup_date"].notna() & (df["signup_date"] != "")][["customer_id", "signup_date"]]
    mask_signup_range_invalid = (~pd.isna(df["signup_date_norm"])) & ((df["signup_date_norm"] < DATE_MIN) | (df["signup_date_norm"] > DATE_MAX))
    issues["signup_date_out_of_range"] = df[mask_signup_range_invalid][["customer_id", "signup_date", "signup_date_norm"]]

    return df, issues, {"signup_date_meta": meta_user}


def check_foreign_keys(df_orders_norm: pd.DataFrame, df_users_norm: pd.DataFrame):
    # 左连接检出订单孤儿(外键不匹配)
    merged = df_orders_norm.merge(
        df_users_norm[["customer_id", "signup_date_norm", "region"]],
        on="customer_id",
        how="left",
        indicator=True,
        suffixes=("", "_user")
    )
    orphan = merged[merged["_merge"] == "left_only"]
    # 可选的业务一致性:下单日期 >= 注册日期(若两者都有效)
    has_both = merged["order_date_norm"].notna() & merged["signup_date_norm"].notna()
    violates_chronology = merged[has_both & (merged["order_date_norm"] < merged["signup_date_norm"])]

    fk_issues = {
        "fk_orphan_orders": orphan[["order_id", "customer_id", "order_date_norm", "signup_date_norm", "region"]],
        "order_before_signup": violates_chronology[["order_id", "customer_id", "order_date_norm", "signup_date_norm"]]
    }
    return merged, fk_issues


def save_issues(issues: dict, prefix: str):
    ensure_output_dir(OUTPUT_DIR)
    for name, df in issues.items():
        if df is None or len(df) == 0:
            continue
        out_path = os.path.join(OUTPUT_DIR, f"{prefix}__{name}.csv")
        df.to_csv(out_path, index=False, encoding="utf-8-sig")


def print_summary(total_orders: int, total_users: int, issues_all: dict, metas: dict):
    print("=== 数据完整性验证汇总 ===")
    print(f"- 订单表记录数: {total_orders}")
    print(f"- 用户表记录数: {total_users}")

    # 日期解析元信息
    if "order_date_meta" in metas:
        m = metas["order_date_meta"]
        print(f"- 订单日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")
    if "signup_date_meta" in metas:
        m = metas["signup_date_meta"]
        print(f"- 注册日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")

    # 统计各类问题计数
    print("\n=== 问题计数 ===")
    for section, sec_issues in issues_all.items():
        for name, df in sec_issues.items():
            cnt = 0 if df is None else len(df)
            print(f"[{section}] {name}: {cnt}")

    print(f"\n明细已输出到目录: {OUTPUT_DIR}\n")


def main():
    try:
        # 读取
        orders_req = {"order_id", "order_date", "customer_id", "amount", "channel", "currency"}
        users_req  = {"customer_id", "signup_date", "region"}
        df_orders_raw = read_csv_as_strings(ORDERS_CSV, orders_req)
        df_users_raw  = read_csv_as_strings(USERS_CSV, users_req)

        # 验证各自表
        df_orders_norm, order_issues, order_meta = validate_orders(df_orders_raw)
        df_users_norm, user_issues, user_meta = validate_users(df_users_raw)

        # 外键与合并
        merged, fk_issues = check_foreign_keys(df_orders_norm, df_users_norm)

        # 汇总与输出
        save_issues(order_issues, "orders")
        save_issues(user_issues, "users")
        save_issues(fk_issues, "fk")

        # 输出已归一化(合并后)样例(可选)
        sample_out = os.path.join(OUTPUT_DIR, "merged_sample.csv")
        merged.head(1000).to_csv(sample_out, index=False, encoding="utf-8-sig")

        # 汇总打印
        issues_all = {"orders": order_issues, "users": user_issues, "merge_fk": fk_issues}
        metas = {"order_date_meta": order_meta["order_date_meta"], "signup_date_meta": user_meta["signup_date_meta"]}
        print_summary(len(df_orders_raw), len(df_users_raw), issues_all, metas)

        # 以退出码标识是否存在“严重问题”
        severe_counts = 0
        severe_counts += len(order_issues["order_id_null"])
        severe_counts += len(order_issues["order_pk_duplicates"])
        severe_counts += len(order_issues["order_customer_id_null"])
        severe_counts += len(order_issues["amount_negative"])
        severe_counts += len(order_issues["channel_null"]) + len(order_issues["channel_invalid_enum"])
        severe_counts += len(order_issues["order_date_unparseable"])
        severe_counts += len(user_issues["user_customer_id_null"])
        severe_counts += len(user_issues["user_pk_duplicates"])
        severe_counts += len(user_issues["signup_date_unparseable"])
        severe_counts += len(fk_issues["fk_orphan_orders"])

        if severe_counts > 0:
            sys.exit(2)
        else:
            sys.exit(0)

    except Exception as e:
        print(f"运行失败: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()

使用说明

  • 准备 orders.csv 与 users.csv,列名为脚本中的规范列名(不区分大小写,脚本会统一小写)。
  • 按需调整脚本顶部的配置(文件路径、目标时区、日期范围等)。
  • 运行:python validate_integrity.py
  • 输出:
    • 控制台打印汇总统计。
    • output/ 目录下输出每类问题的明细 CSV,例如:
      • orders__order_pk_duplicates.csv(订单主键重复)
      • fk__fk_orphan_orders.csv(外键不匹配的订单)
      • orders__channel_invalid_enum.csv(渠道枚举非法)
      • orders__order_date_unparseable.csv / users__signup_date_unparseable.csv(日期无法解析)
    • merged_sample.csv 为合并后前1000行样例,便于抽样核验。

校验要点与解释

  • 主键/外键:订单表要求 order_id 非空且唯一;用户表要求 customer_id 非空且唯一;订单中的 customer_id 必须能在用户表找到。
  • 金额:若有取值则必须为数值且 >= 0;空值允许(根据给定约束未要求非空)。
  • 渠道:仅允许 web、app,区分大小写前会做标准化到小写。
  • 货币:字段默认值为 CNY。若数据中出现空值,视为未正确落地默认,记为错误;非空取值不做枚举限制(不同币种是允许的)。另提供格式检查(3位大写)作为信息级提示。
  • 日期与时区:
    • 纯日期字符串按日解析;
    • 含时区的时间戳先归一到 UTC 再转目标时区;
    • 无时区的时间戳按目标时区本地化;
    • 最终统一为“本地日期”以便跨表比对与范围校验;
    • 提供解析不可用与范围异常的明细输出。
  • 重复:除主键重复外,额外输出整行重复记录,便于发现重复导出或重复摄入问题。
  • 退出码:存在严重问题时进程以 2 退出,便于在 CI/调度中自动拦截。

Below is a self-contained PySpark script that validates the described dataset for deduplication, partition completeness, time-window bounds, enum legality, JSON structure, and required non-null fields. It produces two outputs: a “clean” deduplicated dataset and an “invalid” dataset with violation reasons. It also optionally fails if expected partitions are missing.

Assumptions:

  • Input is a partitioned Parquet dataset with column names: event_id (string), event_time (timestamp), user_id (string), event_type (string), properties (string containing JSON), dt (date partition).
  • Enum values are case-insensitive; we normalize to lowercase before validation.
  • Required non-null fields: event_id, event_time, dt.
  • JSON field “properties” is nullable. If present, it must be a valid JSON object. Required JSON keys are configurable.
  • Time window rule: to_date(event_time) must be within [dt - late_lag_days, dt + future_drift_days] (inclusive).

You can adjust configuration via command-line arguments.

Python script (PySpark):

#!/usr/bin/env python3
import argparse
import sys
from datetime import datetime, date, timedelta

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, lower, trim, to_date, date_add, date_sub, expr, when, lit,
    array, array_union, array_except, size, from_json, map_keys, element_at,
    greatest, least, coalesce
)
from pyspark.sql.types import StringType, MapType

def parse_args():
    p = argparse.ArgumentParser(description="ETL integrity validation for event dataset")
    p.add_argument("--input-path", required=True, help="Input dataset path (Parquet), partitioned by dt")
    p.add_argument("--output-valid-path", required=True, help="Output path for valid, deduplicated dataset")
    p.add_argument("--output-invalid-path", required=True, help="Output path for invalid records with reasons")
    p.add_argument("--start-dt", required=True, help="Expected partition range start (YYYY-MM-DD)")
    p.add_argument("--end-dt", required=True, help="Expected partition range end (YYYY-MM-DD)")
    p.add_argument("--allowed-event-types", default="view,click,purchase", help="Comma-separated allowed event types")
    p.add_argument("--late-lag-days", type=int, default=1, help="Max allowed lateness (days) wrt dt")
    p.add_argument("--future-drift-days", type=int, default=0, help="Max allowed future drift (days) wrt dt")
    p.add_argument("--required-json-keys", default="", help="Comma-separated keys required within properties JSON (if properties is not null)")
    p.add_argument("--fail-on-partition-missing", action="store_true", help="Fail job if any expected dt partitions are missing")
    p.add_argument("--repartition", type=int, default=0, help="Optional number of partitions for output")
    return p.parse_args()

def daterange(start_dt: date, end_dt: date):
    d = start_dt
    out = []
    while d <= end_dt:
        out.append(d)
        d += timedelta(days=1)
    return out

def main():
    args = parse_args()

    start_dt = datetime.strptime(args.start_dt, "%Y-%m-%d").date()
    end_dt = datetime.strptime(args.end_dt, "%Y-%m-%d").date()
    if start_dt > end_dt:
        print("Error: start-dt must be <= end-dt", file=sys.stderr)
        sys.exit(2)

    allowed_event_types = [s.strip().lower() for s in args.allowed_event_types.split(",") if s.strip()]
    required_json_keys = [s.strip() for s in args.required_json_keys.split(",") if s.strip()]

    spark = (
        SparkSession.builder
        .appName("event-integrity-validation")
        .getOrCreate()
    )

    # Read input dataset
    df = spark.read.parquet(args.input_path)

    # Normalize types and values
    # - ensure event_type lowercase for enum validation
    # - ensure dt is date, event_time is timestamp
    df = df \
        .withColumn("event_type_norm", lower(trim(col("event_type")))) \
        .withColumn("dt", to_date(col("dt"))) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \
        .withColumn("event_id", col("event_id").cast(StringType())) \
        .withColumn("user_id", col("user_id").cast(StringType())) \
        .withColumn("properties", col("properties").cast(StringType()))

    # Deduplicate by primary key event_id:
    # Keep the row with the latest event_time; in tie, keep the lexicographically max JSON to stabilize.
    # Mark dropped duplicates for invalid reporting.
    w = Window.partitionBy("event_id").orderBy(
        col("event_time").desc_nulls_last(),
        col("properties").desc_nulls_last()
    )
    df_with_rownum = df.withColumn("rn", expr("row_number() OVER (PARTITION BY event_id ORDER BY event_time DESC NULLS LAST, properties DESC NULLS LAST)"))
    canonical = df_with_rownum.filter(col("rn") == 1).drop("rn")
    duplicates_removed = df_with_rownum.filter(col("rn") > 1).drop("rn") \
        .withColumn("violations", array(lit("duplicate_removed")))

    # Core validations on canonical set

    # Required non-null fields
    required_viol = array()
    required_viol = when(col("event_id").isNull(), array_union(required_viol, array(lit("missing_required:event_id")))).otherwise(required_viol)
    required_viol = when(col("event_time").isNull(), array_union(required_viol, array(lit("missing_required:event_time")))).otherwise(required_viol)
    required_viol = when(col("dt").isNull(), array_union(required_viol, array(lit("missing_required:dt")))).otherwise(required_viol)

    # Enum validation (event_type cannot be null and must be in allowed set)
    enum_valid = col("event_type_norm").isin(allowed_event_types)
    enum_viol = when(col("event_type_norm").isNull(), array(lit("invalid_enum:null"))) \
        .otherwise(when(enum_valid, array()).otherwise(array(lit("invalid_enum:event_type"))))

    # Time window validation:
    # Allowed event_time date must be within [dt - late_lag_days, dt + future_drift_days]
    event_date = to_date(col("event_time"))
    min_allowed_date = date_sub(col("dt"), args.late_lag_days)
    max_allowed_date = date_add(col("dt"), args.future_drift_days)
    time_viol = when(
        col("event_time").isNull(), array(lit("time_window:event_time_null"))
    ).otherwise(
        when((event_date < min_allowed_date) | (event_date > max_allowed_date),
             array(lit("time_window:out_of_range"))).otherwise(array())
    )

    # JSON validation: properties is nullable; if present must be a valid JSON object.
    # We parse as Map<String,String> for structural validation. If parse fails => null.
    json_schema = MapType(StringType(), StringType(), containsNull=True)
    parsed_props = from_json(trim(col("properties")), json_schema)
    json_viol = when(
        col("properties").isNull(), array()
    ).otherwise(
        when(parsed_props.isNull(), array(lit("invalid_json:parse_error"))).otherwise(array())
    )

    # Required JSON keys (if provided): keys must exist and be non-empty when properties is not null
    if required_json_keys:
        # For each required key, check existence and non-empty value
        for k in required_json_keys:
            json_viol = when(
                col("properties").isNull(), json_viol  # no requirement if properties is null
            ).otherwise(
                when(
                    (parsed_props.isNull()) | (element_at(parsed_props, lit(k)).isNull()) | (trim(element_at(parsed_props, lit(k))) == ""),
                    array_union(json_viol, array(lit(f"missing_json_key:{k}")))
                ).otherwise(json_viol)
            )

    # Aggregate violations for canonical records
    violations = array()
    violations = array_union(violations, required_viol)
    violations = array_union(violations, enum_viol)
    violations = array_union(violations, time_viol)
    violations = array_union(violations, json_viol)

    canonical_with_viol = canonical.withColumn("violations", violations)

    # Split valid vs invalid
    invalid_canonical = canonical_with_viol.filter(size(col("violations")) > 0)
    valid_canonical = canonical_with_viol.filter(size(col("violations")) == 0).drop("violations")

    # Combine invalid sets (duplicates + invalid canonical)
    invalid_all = invalid_canonical.unionByName(duplicates_removed, allowMissingColumns=True)

    # Partition completeness check
    expected_dates = [d.strftime("%Y-%m-%d") for d in daterange(start_dt, end_dt)]
    actual_dates = [r["dt"].strftime("%Y-%m-%d") for r in df.select("dt").distinct().collect() if r["dt"] is not None]
    expected_set = set(expected_dates)
    actual_set = set(actual_dates)
    missing_partitions = sorted(expected_set - actual_set)

    # Write outputs
    if args.repartition > 0:
        valid_canonical = valid_canonical.repartition(args.repartition, col("dt"))
        invalid_all = invalid_all.repartition(args.repartition)

    # Write valid dataset partitioned by dt to preserve downstream partitioning
    valid_canonical.write.mode("overwrite").partitionBy("dt").parquet(args.output_valid_path)
    invalid_all.write.mode("overwrite").parquet(args.output_invalid_path)

    # Metrics and summary
    total_in = df.count()
    total_valid = valid_canonical.count()
    total_invalid = invalid_all.count()
    total_duplicates = duplicates_removed.count()

    print("Validation Summary")
    print(f"- Input rows: {total_in}")
    print(f"- Valid (deduplicated) rows: {total_valid}")
    print(f"- Invalid rows: {total_invalid}")
    print(f"- Duplicates removed: {total_duplicates}")
    print(f"- Expected dt partitions: {len(expected_set)} [{args.start_dt}..{args.end_dt}]")
    print(f"- Actual dt partitions: {len(actual_set)}")
    if missing_partitions:
        print(f"- Missing dt partitions ({len(missing_partitions)}): {', '.join(missing_partitions)}")
        if args.fail_on_partition_missing:
            print("Failing due to missing partitions.", file=sys.stderr)
            spark.stop()
            sys.exit(1)
    else:
        print("- Partition completeness: OK")

    spark.stop()

if __name__ == "__main__":
    main()

Usage example:

  • Validate a dataset on s3 with 7-day completeness window, allowing 1 day late arrival and no future drift; require properties keys “item_id” and “price” when properties exists; fail if any partition missing.
python validate_events.py \
  --input-path s3://bucket/events_parquet/ \
  --output-valid-path s3://bucket/events_valid/ \
  --output-invalid-path s3://bucket/events_invalid/ \
  --start-dt 2025-09-20 \
  --end-dt 2025-09-26 \
  --allowed-event-types view,click,purchase \
  --late-lag-days 1 \
  --future-drift-days 0 \
  --required-json-keys item_id,price \
  --fail-on-partition-missing \
  --repartition 200

Notes:

  • Deduplication retains a single canonical row per event_id (latest event_time). Removed duplicates are captured in the invalid output with reason “duplicate_removed”.
  • The JSON structure check ensures the “properties” field, if present, parses into an object (map). If parsing fails, the record is flagged.
  • Time-window validation operates on event_date relative to dt; adjust late_lag_days/future_drift_days to fit your pipeline’s allowed lateness.
  • The script prints a concise summary and can hard-fail on missing partitions for strong SLA enforcement.

Below is a self-contained Python script that validates the integrity of a campaign reporting dataset. It checks schema conformance, funnel consistency (conversion <= click <= impression), date intervals, missing and anomalous values, channel enumeration, and currency consistency (if a currency column is present or specified).

You can save it as validate_campaign_report.py and run: python validate_campaign_report.py --input path/to/data.csv --output path/to/validation_report.csv --currency-col currency --expected-currency USD

If no currency column exists or is not specified, the currency check is skipped with a warning.

#!/usr/bin/env python3
"""
Validate campaign reporting dataset integrity.

Required columns:
- campaign_id: string (non-null)
- channel: enum in {"email", "ad", "app"}
- impression: int >= 0
- click: int >= 0
- conversion: int >= 0
- cost: decimal >= 0
- start_date: date
- end_date: date

Checks:
- Missing values in required fields
- Type and range: non-negative, integer for counts
- Funnel consistency: conversion <= click <= impression
- Date parsing and interval: start_date <= end_date
- Channel enumeration correctness
- Currency consistency (if currency column provided or detected):
    - Non-null currency values
    - Single unique currency or match expected currency

Outputs a CSV of row-level errors and prints a summary to stdout.
"""

import argparse
import sys
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple

REQUIRED_COLUMNS = [
    "campaign_id", "channel", "impression", "click",
    "conversion", "cost", "start_date", "end_date"
]
ALLOWED_CHANNELS = {"email", "ad", "app"}

def coerce_numeric(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
    """Return (parsed_numeric, is_valid_numeric) with NaN for invalid entries."""
    s = pd.to_numeric(df[col], errors="coerce")
    valid = ~s.isna()
    return s, valid

def coerce_date(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
    """Return (parsed_datetime, is_valid_date) with NaT for invalid entries."""
    s = pd.to_datetime(df[col], errors="coerce", utc=False, infer_datetime_format=True)
    valid = ~s.isna()
    return s, valid

def validate_campaign_report(
    df: pd.DataFrame,
    currency_col: Optional[str] = None,
    expected_currency: Optional[str] = None
) -> Dict:
    errors: List[Dict] = []

    # Normalize column names by stripping whitespace
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]

    # Check presence of required columns
    missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
    if missing_cols:
        return {
            "fatal": True,
            "message": f"Missing required columns: {missing_cols}",
            "errors": pd.DataFrame(columns=["row_index", "campaign_id", "field", "code", "message"]),
            "summary": {}
        }

    n_rows = len(df)

    # Validate campaign_id non-null and non-empty
    cid_null = df["campaign_id"].isna() | (df["campaign_id"].astype(str).str.strip() == "")
    for idx in df.index[cid_null]:
        errors.append({
            "row_index": idx,
            "campaign_id": None if pd.isna(df.loc[idx, "campaign_id"]) else str(df.loc[idx, "campaign_id"]),
            "field": "campaign_id",
            "code": "missing",
            "message": "campaign_id is null or empty"
        })

    # Validate channel enumeration and missing
    ch_null = df["channel"].isna() | (df["channel"].astype(str).str.strip() == "")
    for idx in df.index[ch_null]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "channel",
            "code": "missing",
            "message": "channel is null or empty"
        })
    # allowed values
    ch_str = df["channel"].astype(str).str.strip().str.lower()
    ch_invalid = ~ch_str.isin(ALLOWED_CHANNELS)
    # Skip invalid flag for rows already missing
    ch_invalid = ch_invalid & ~ch_null
    for idx in df.index[ch_invalid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "channel",
            "code": "enum_violation",
            "message": f"channel '{df.loc[idx, 'channel']}' not in {sorted(ALLOWED_CHANNELS)}"
        })

    # Numeric columns: impression, click, conversion, cost
    numeric_cols = ["impression", "click", "conversion", "cost"]
    parsed_numeric = {}
    valid_numeric = {}

    for col in numeric_cols:
        s_parsed, s_valid = coerce_numeric(df, col)
        parsed_numeric[col] = s_parsed
        valid_numeric[col] = s_valid

        # Missing or non-numeric
        invalid_mask = ~s_valid
        for idx in df.index[invalid_mask]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": col,
                "code": "type_error",
                "message": f"{col} is not numeric"
            })

        # Non-negative constraint
        nonneg_mask = s_parsed < 0
        for idx in df.index[nonneg_mask & s_valid]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": col,
                "code": "range_violation",
                "message": f"{col} must be >= 0"
            })

        # Integer constraint for count columns
        if col in {"impression", "click", "conversion"}:
            # Allow NaN failures already recorded; check integer-ness for valid values
            int_mask = s_parsed.dropna().apply(lambda x: float(x).is_integer())
            int_mask = int_mask.reindex(df.index, fill_value=False)
            non_integer = ~int_mask & s_valid
            for idx in df.index[non_integer]:
                errors.append({
                    "row_index": idx,
                    "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                    "field": col,
                    "code": "type_error",
                    "message": f"{col} must be an integer count"
                })

    # Funnel consistency: conversion <= click <= impression
    conv = parsed_numeric["conversion"]
    clk = parsed_numeric["click"]
    imp = parsed_numeric["impression"]
    # Only evaluate on rows where all three are valid and non-negative integers
    funnel_valid = valid_numeric["conversion"] & valid_numeric["click"] & valid_numeric["impression"]
    # Apply comparison
    funnel_violation = funnel_valid & ~((conv <= clk) & (clk <= imp))
    for idx in df.index[funnel_violation]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "funnel",
            "code": "consistency_error",
            "message": f"conversion({conv.loc[idx]}) <= click({clk.loc[idx]}) <= impression({imp.loc[idx]}) violated"
        })

    # Date columns
    start_parsed, start_valid = coerce_date(df, "start_date")
    end_parsed, end_valid = coerce_date(df, "end_date")

    for idx in df.index[~start_valid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "start_date",
            "code": "type_error",
            "message": "start_date is invalid or cannot be parsed"
        })
    for idx in df.index[~end_valid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "end_date",
            "code": "type_error",
            "message": "end_date is invalid or cannot be parsed"
        })

    # Date interval: start_date <= end_date
    date_interval_valid = start_valid & end_valid
    interval_violation = date_interval_valid & (start_parsed > end_parsed)
    for idx in df.index[interval_violation]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "date_range",
            "code": "range_violation",
            "message": f"start_date({start_parsed.loc[idx].date()}) > end_date({end_parsed.loc[idx].date()})"
        })

    # Currency consistency (optional)
    currency_check_status = "skipped"
    currency_unique_values = None
    if currency_col is None:
        # Try to auto-detect common currency column names
        for candidate in ["currency", "cost_currency"]:
            if candidate in df.columns:
                currency_col = candidate
                break

    if currency_col is not None and currency_col in df.columns:
        currency_check_status = "performed"
        cur_series = df[currency_col].astype(str).str.strip()
        cur_missing = cur_series.isna() | (cur_series == "") | (cur_series.str.lower() == "nan")
        for idx in df.index[cur_missing]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": currency_col,
                "code": "missing",
                "message": f"{currency_col} is null or empty"
            })

        currency_unique_values = sorted(cur_series[~cur_missing].unique().tolist())
        if expected_currency is not None:
            mism = cur_series[~cur_missing] != expected_currency
            for idx in df.index[mism]:
                errors.append({
                    "row_index": idx,
                    "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                    "field": currency_col,
                    "code": "consistency_error",
                    "message": f"Currency '{cur_series.loc[idx]}' != expected '{expected_currency}'"
                })
        else:
            # Require single currency across dataset
            if len(set(cur_series[~cur_missing])) > 1:
                # Report a dataset-level inconsistency with per-row notes
                for idx in df.index[~cur_missing]:
                    # Only flag rows that contribute to inconsistency (i.e., not the modal value)
                    # For simplicity, flag all non-missing rows when multiple currencies exist
                    errors.append({
                        "row_index": idx,
                        "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                        "field": currency_col,
                        "code": "consistency_error",
                        "message": f"Multiple currencies present in dataset: {currency_unique_values}"
                    })
    else:
        currency_check_status = "skipped_no_column"

    # Build errors DataFrame
    errors_df = pd.DataFrame(errors, columns=["row_index", "campaign_id", "field", "code", "message"])
    errors_df.sort_values(by=["row_index", "field"], inplace=True)

    # Summary metrics
    summary = {
        "row_count": int(n_rows),
        "error_count": int(len(errors_df)),
        "missing_campaign_id": int(((df["campaign_id"].isna()) | (df["campaign_id"].astype(str).str.strip() == "")).sum()),
        "invalid_channel": int(ch_invalid.sum()),
        "missing_channel": int(ch_null.sum()),
        "invalid_impression": int((~valid_numeric["impression"]).sum()),
        "invalid_click": int((~valid_numeric["click"]).sum()),
        "invalid_conversion": int((~valid_numeric["conversion"]).sum()),
        "invalid_cost": int((~valid_numeric["cost"]).sum()),
        "negative_impression": int((parsed_numeric["impression"] < 0).sum()),
        "negative_click": int((parsed_numeric["click"] < 0).sum()),
        "negative_conversion": int((parsed_numeric["conversion"] < 0).sum()),
        "negative_cost": int((parsed_numeric["cost"] < 0).sum()),
        "funnel_inconsistency": int(funnel_violation.sum()),
        "invalid_start_date": int((~start_valid).sum()),
        "invalid_end_date": int((~end_valid).sum()),
        "date_range_violation": int(interval_violation.sum()),
        "currency_check_status": currency_check_status,
        "currency_unique_values": currency_unique_values if currency_unique_values is not None else []
    }

    return {
        "fatal": False,
        "message": "Validation completed",
        "errors": errors_df,
        "summary": summary
    }

def main():
    parser = argparse.ArgumentParser(description="Validate campaign report dataset integrity.")
    parser.add_argument("--input", required=True, help="Path to input CSV file")
    parser.add_argument("--output", required=False, help="Path to output CSV file for row-level errors")
    parser.add_argument("--currency-col", required=False, help="Currency column name (e.g., 'currency')")
    parser.add_argument("--expected-currency", required=False, help="Expected currency code (e.g., 'USD')")
    parser.add_argument("--delimiter", required=False, default=",", help="CSV delimiter (default ',')")
    parser.add_argument("--encoding", required=False, default="utf-8", help="File encoding (default 'utf-8')")
    args = parser.parse_args()

    try:
        df = pd.read_csv(args.input, delimiter=args.delimiter, encoding=args.encoding)
    except Exception as e:
        print(f"Failed to read input file: {e}", file=sys.stderr)
        sys.exit(2)

    result = validate_campaign_report(df, currency_col=args.currency_col, expected_currency=args.expected_currency)

    if result.get("fatal", False):
        print(f"Fatal error: {result.get('message')}", file=sys.stderr)
        sys.exit(1)

    # Print summary
    summary = result["summary"]
    print("Validation Summary")
    print(f"Rows: {summary['row_count']}")
    print(f"Total errors: {summary['error_count']}")
    print(f"Missing campaign_id: {summary['missing_campaign_id']}")
    print(f"Missing channel: {summary['missing_channel']}")
    print(f"Invalid channel enum: {summary['invalid_channel']}")
    print(f"Invalid impression: {summary['invalid_impression']} | Negative impression: {summary['negative_impression']}")
    print(f"Invalid click: {summary['invalid_click']} | Negative click: {summary['negative_click']}")
    print(f"Invalid conversion: {summary['invalid_conversion']} | Negative conversion: {summary['negative_conversion']}")
    print(f"Invalid cost: {summary['invalid_cost']} | Negative cost: {summary['negative_cost']}")
    print(f"Funnel inconsistencies: {summary['funnel_inconsistency']}")
    print(f"Invalid start_date: {summary['invalid_start_date']}")
    print(f"Invalid end_date: {summary['invalid_end_date']}")
    print(f"Date range violations: {summary['date_range_violation']}")
    print(f"Currency check: {summary['currency_check_status']}")
    if summary["currency_unique_values"]:
        print(f"Currencies detected: {summary['currency_unique_values']}")

    # Write per-row errors if requested
    if args.output:
        try:
            result["errors"].to_csv(args.output, index=False)
            print(f"Row-level errors written to {args.output}")
        except Exception as e:
            print(f"Failed to write output errors CSV: {e}", file=sys.stderr)
            sys.exit(3)

if __name__ == "__main__":
    main()

示例详情

解决的问题

把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。

适用用户

数据分析师

在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。

数据工程师

在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。

BI与运营团队

活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。

特征总结

一键生成数据完整性校验脚本,按数据集特征自动定制规则与阈值
自动识别缺失、重复与异常值,给出修复建议与示例代码,降低清洗成本
按业务字段定义必填、唯一与取值范围,快速定位问题列与具体问题记录
可设置容忍度与告警级别,生成可读报告,让数据质量与团队标准对齐
适配多种数据表结构与场景,灵活扩展校验项,省去繁琐手写脚本
结合上下文解释校验结果,指出业务风险影响与下一步处理路径
模板化参数输入,团队可复用,缩短从需求到上线的验证与交付时间
无缝嵌入日常分析流程,运行即输出检查结论与可执行修复清单
输出结构化、清晰的说明与报告,便于审计留痕与跨团队沟通协作

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥20.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 252 tokens
- 2 个可调节参数
{ 数据集特征 } { 输出语言 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59