数据验证脚本编写指南

0 浏览
0 试用
0 购买
Sep 27, 2025更新

生成用于验证数据完整性的Python脚本,精准高效。

示例1

下面给出一份可直接运行的 Python 脚本,使用 pandas 对“订单表 + 用户表”进行数据完整性验证,并输出问题明细与汇总。覆盖的检查项包括:主键/外键一致性、金额范围、渠道枚举、日期合法与时区归一、缺失与重复值等。

说明
- 输入:两个 CSV 文件(orders.csv, users.csv),也可在配置区修改路径。
- 输出:控制台汇总报告 + 多个问题明细 CSV 到 output/ 目录。
- 合并:对订单左连接用户,检出孤儿订单(外键不匹配)。
- 时区:将包含时间或时区的时间戳统一归一到 Asia/Shanghai 后取日期;纯日期字符串不做时区偏移。

脚本
```python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
from datetime import date, datetime, timedelta

import numpy as np
import pandas as pd
import pytz


############################
# 配置
############################
ORDERS_CSV = "orders.csv"   # 订单表CSV路径,要求包含列:order_id, order_date, customer_id, amount, channel, currency
USERS_CSV  = "users.csv"    # 用户表CSV路径,要求包含列:customer_id, signup_date, region
OUTPUT_DIR = "output"       # 问题明细输出目录
TARGET_TZ  = "Asia/Shanghai" # 日期时间归一的目标时区
ALLOWED_CHANNELS = {"web", "app"}  # 渠道枚举

# 日期合理范围(可按需调整)
DATE_MIN = pd.to_datetime("1970-01-01").date()
DATE_MAX = (datetime.now(pytz.timezone(TARGET_TZ)) + timedelta(days=365)).date()  # 未来一年的容忍


############################
# 工具函数
############################
def ensure_output_dir(path: str):
    os.makedirs(path, exist_ok=True)


def read_csv_as_strings(csv_path: str, required_cols: set):
    if not os.path.exists(csv_path):
        raise FileNotFoundError(f"未找到文件: {csv_path}")

    # 统一以字符串读取,避免前导零丢失/科学计数等问题;后续再做严格类型校验
    df = pd.read_csv(csv_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "NULL"])
    # 统一列名小写
    df.columns = [c.strip().lower() for c in df.columns]
    missing = required_cols - set(df.columns)
    if missing:
        raise ValueError(f"文件 {csv_path} 缺少必需列: {missing}")

    # 去除字符串首尾空白
    for c in df.columns:
        if pd.api.types.is_string_dtype(df[c]):
            df[c] = df[c].str.strip()
    return df


def to_nullable_int(series: pd.Series):
    # 允许空值;验证为整数(禁止小数)
    s = series.copy()
    # 去掉逗号/空格
    s = s.str.replace(",", "", regex=False)
    # 合法整数(可含符号),用正则先行筛查,避免 1.0 之类误判
    mask_nonempty = s.notna() & (s != "")
    mask_intlike = s.str.match(r"^[+-]?\d+$", na=False)
    out = pd.Series(pd.NA, index=s.index, dtype="Int64")
    out.loc[mask_nonempty & mask_intlike] = pd.to_numeric(s[mask_nonempty & mask_intlike], errors="coerce").astype("Int64")
    return out


def to_nullable_decimal(series: pd.Series):
    s = series.copy()
    s = s.str.replace(",", "", regex=False)
    # 允许小数;返回float64并保留NaN
    out = pd.to_numeric(s, errors="coerce")
    return out


def normalize_channel(series: pd.Series):
    s = series.str.lower().str.strip()
    return s


def parse_to_local_date(series: pd.Series, target_tz: str):
    """
    规则:
    - 纯日期(YYYY-MM-DD)直接解析为日期;
    - 含时间戳:
        * 含时区偏移/标记:转为UTC再转换到目标时区,最后取date;
        * 无时区:按目标时区本地化,再取date。
    返回:
    - dates: pd.Series of python date (或 NaT),dtype=object
    - meta: dict,包含计数统计
    """
    tz = pytz.timezone(target_tz)
    raw = series.astype("string").str.strip()

    mask_empty = raw.isna() | (raw == "")
    mask_date_only = raw.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False)
    mask_has_tz = raw.str.contains(r"(Z|[+-]\d{2}:\d{2}|[+-]\d{4}|UTC|GMT)", case=False, na=False)
    mask_timestamp = ~mask_empty & ~mask_date_only

    dates = pd.Series(pd.NaT, index=raw.index, dtype="object")  # 保存为 python date 或 NaT

    # 纯日期
    if mask_date_only.any():
        d = pd.to_datetime(raw[mask_date_only], format="%Y-%m-%d", errors="coerce")
        dates.loc[mask_date_only] = d.dt.date

    # 含时区的时间戳
    mask_ts_tz = mask_timestamp & mask_has_tz
    if mask_ts_tz.any():
        dt_utc = pd.to_datetime(raw[mask_ts_tz], errors="coerce", utc=True)
        dt_local = dt_utc.dt.tz_convert(tz)
        dates.loc[mask_ts_tz] = dt_local.dt.date

    # 无时区的时间戳 -> 本地化到目标时区
    mask_ts_naive = mask_timestamp & (~mask_has_tz)
    if mask_ts_naive.any():
        dt_naive = pd.to_datetime(raw[mask_ts_naive], errors="coerce", utc=False)
        try:
            dt_localized = dt_naive.dt.tz_localize(tz)
        except TypeError:
            # 若 dt_naive 已带tz(极端混合情况),退化处理
            dt_localized = dt_naive
            dt_localized = dt_localized.dt.tz_convert(tz)
        dates.loc[mask_ts_naive] = dt_localized.dt.date

    meta = {
        "empty": int(mask_empty.sum()),
        "date_only": int(mask_date_only.sum()),
        "timestamp_with_tz": int(mask_ts_tz.sum()),
        "timestamp_naive": int(mask_ts_naive.sum()),
        "parsed_na": int(pd.isna(dates).sum())
    }
    return dates, meta


def regex_iso_currency(series: pd.Series):
    # 简单校验货币代码格式(ISO-4217 形态):3位大写字母
    s = series.astype("string")
    return s.str.fullmatch(r"[A-Z]{3}", na=False)


############################
# 验证逻辑
############################
def validate_orders(df_orders: pd.DataFrame):
    issues = {}

    # 1) 主键与必填约束
    df = df_orders.copy()
    df["order_id_int"] = to_nullable_int(df["order_id"])
    issues["order_id_null"] = df[df["order_id_int"].isna()][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]
    # 主键重复
    dup_order = df["order_id_int"].duplicated(keep=False) & df["order_id_int"].notna()
    issues["order_pk_duplicates"] = df[dup_order][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]

    # 2) 非空约束:customer_id
    issues["order_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["order_id", "customer_id"]]

    # 3) 金额 decimal >= 0
    df["amount_num"] = to_nullable_decimal(df["amount"])
    issues["amount_not_numeric"] = df[df["amount"].notna() & (df["amount"] != "") & df["amount_num"].isna()][["order_id", "amount"]]
    issues["amount_negative"] = df[df["amount_num"].notna() & (df["amount_num"] < 0)][["order_id", "amount", "amount_num"]]

    # 4) 渠道枚举
    df["channel_norm"] = normalize_channel(df["channel"])
    issues["channel_null"] = df[df["channel_norm"].isna() | (df["channel_norm"] == "")][["order_id", "channel"]]
    issues["channel_invalid_enum"] = df[df["channel_norm"].notna() & (df["channel_norm"] != "") & (~df["channel_norm"].isin(ALLOWED_CHANNELS))][["order_id", "channel"]]

    # 5) 货币默认/格式(默认CNY:若为空,视为未正确应用默认;非空不限定取值)
    issues["currency_missing_should_default"] = df[df["currency"].isna() | (df["currency"] == "")][["order_id", "currency"]]
    # 可选:检查格式是否符合3位大写(信息级)
    fmt_mask = regex_iso_currency(df["currency"])
    issues["currency_format_non_iso_like_info"] = df[(df["currency"].notna()) & (df["currency"] != "") & (~fmt_mask)][["order_id", "currency"]]

    # 6) 日期合法与时区归一
    order_dates, meta_order = parse_to_local_date(df["order_date"], TARGET_TZ)
    df["order_date_norm"] = order_dates
    issues["order_date_unparseable"] = df[pd.isna(df["order_date_norm"]) & df["order_date"].notna() & (df["order_date"] != "")][["order_id", "order_date"]]
    # 日期范围
    mask_order_range_invalid = (~pd.isna(df["order_date_norm"])) & ((df["order_date_norm"] < DATE_MIN) | (df["order_date_norm"] > DATE_MAX))
    issues["order_date_out_of_range"] = df[mask_order_range_invalid][["order_id", "order_date", "order_date_norm"]]

    # 7) 订单表全行重复(可能来自重复导出)
    issues["order_fullrow_duplicates"] = df[df.duplicated(subset=["order_id", "order_date", "customer_id", "amount", "channel", "currency"], keep=False)]

    return df, issues, {"order_date_meta": meta_order}


def validate_users(df_users: pd.DataFrame):
    issues = {}
    df = df_users.copy()

    # 1) 主键与必填约束
    issues["user_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["customer_id", "signup_date", "region"]]
    dup_user = df["customer_id"].duplicated(keep=False) & df["customer_id"].notna() & (df["customer_id"] != "")
    issues["user_pk_duplicates"] = df[dup_user][["customer_id", "signup_date", "region"]]

    # 2) 日期合法与时区归一
    signup_dates, meta_user = parse_to_local_date(df["signup_date"], TARGET_TZ)
    df["signup_date_norm"] = signup_dates
    issues["signup_date_unparseable"] = df[pd.isna(df["signup_date_norm"]) & df["signup_date"].notna() & (df["signup_date"] != "")][["customer_id", "signup_date"]]
    mask_signup_range_invalid = (~pd.isna(df["signup_date_norm"])) & ((df["signup_date_norm"] < DATE_MIN) | (df["signup_date_norm"] > DATE_MAX))
    issues["signup_date_out_of_range"] = df[mask_signup_range_invalid][["customer_id", "signup_date", "signup_date_norm"]]

    return df, issues, {"signup_date_meta": meta_user}


def check_foreign_keys(df_orders_norm: pd.DataFrame, df_users_norm: pd.DataFrame):
    # 左连接检出订单孤儿(外键不匹配)
    merged = df_orders_norm.merge(
        df_users_norm[["customer_id", "signup_date_norm", "region"]],
        on="customer_id",
        how="left",
        indicator=True,
        suffixes=("", "_user")
    )
    orphan = merged[merged["_merge"] == "left_only"]
    # 可选的业务一致性:下单日期 >= 注册日期(若两者都有效)
    has_both = merged["order_date_norm"].notna() & merged["signup_date_norm"].notna()
    violates_chronology = merged[has_both & (merged["order_date_norm"] < merged["signup_date_norm"])]

    fk_issues = {
        "fk_orphan_orders": orphan[["order_id", "customer_id", "order_date_norm", "signup_date_norm", "region"]],
        "order_before_signup": violates_chronology[["order_id", "customer_id", "order_date_norm", "signup_date_norm"]]
    }
    return merged, fk_issues


def save_issues(issues: dict, prefix: str):
    ensure_output_dir(OUTPUT_DIR)
    for name, df in issues.items():
        if df is None or len(df) == 0:
            continue
        out_path = os.path.join(OUTPUT_DIR, f"{prefix}__{name}.csv")
        df.to_csv(out_path, index=False, encoding="utf-8-sig")


def print_summary(total_orders: int, total_users: int, issues_all: dict, metas: dict):
    print("=== 数据完整性验证汇总 ===")
    print(f"- 订单表记录数: {total_orders}")
    print(f"- 用户表记录数: {total_users}")

    # 日期解析元信息
    if "order_date_meta" in metas:
        m = metas["order_date_meta"]
        print(f"- 订单日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")
    if "signup_date_meta" in metas:
        m = metas["signup_date_meta"]
        print(f"- 注册日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")

    # 统计各类问题计数
    print("\n=== 问题计数 ===")
    for section, sec_issues in issues_all.items():
        for name, df in sec_issues.items():
            cnt = 0 if df is None else len(df)
            print(f"[{section}] {name}: {cnt}")

    print(f"\n明细已输出到目录: {OUTPUT_DIR}\n")


def main():
    try:
        # 读取
        orders_req = {"order_id", "order_date", "customer_id", "amount", "channel", "currency"}
        users_req  = {"customer_id", "signup_date", "region"}
        df_orders_raw = read_csv_as_strings(ORDERS_CSV, orders_req)
        df_users_raw  = read_csv_as_strings(USERS_CSV, users_req)

        # 验证各自表
        df_orders_norm, order_issues, order_meta = validate_orders(df_orders_raw)
        df_users_norm, user_issues, user_meta = validate_users(df_users_raw)

        # 外键与合并
        merged, fk_issues = check_foreign_keys(df_orders_norm, df_users_norm)

        # 汇总与输出
        save_issues(order_issues, "orders")
        save_issues(user_issues, "users")
        save_issues(fk_issues, "fk")

        # 输出已归一化(合并后)样例(可选)
        sample_out = os.path.join(OUTPUT_DIR, "merged_sample.csv")
        merged.head(1000).to_csv(sample_out, index=False, encoding="utf-8-sig")

        # 汇总打印
        issues_all = {"orders": order_issues, "users": user_issues, "merge_fk": fk_issues}
        metas = {"order_date_meta": order_meta["order_date_meta"], "signup_date_meta": user_meta["signup_date_meta"]}
        print_summary(len(df_orders_raw), len(df_users_raw), issues_all, metas)

        # 以退出码标识是否存在“严重问题”
        severe_counts = 0
        severe_counts += len(order_issues["order_id_null"])
        severe_counts += len(order_issues["order_pk_duplicates"])
        severe_counts += len(order_issues["order_customer_id_null"])
        severe_counts += len(order_issues["amount_negative"])
        severe_counts += len(order_issues["channel_null"]) + len(order_issues["channel_invalid_enum"])
        severe_counts += len(order_issues["order_date_unparseable"])
        severe_counts += len(user_issues["user_customer_id_null"])
        severe_counts += len(user_issues["user_pk_duplicates"])
        severe_counts += len(user_issues["signup_date_unparseable"])
        severe_counts += len(fk_issues["fk_orphan_orders"])

        if severe_counts > 0:
            sys.exit(2)
        else:
            sys.exit(0)

    except Exception as e:
        print(f"运行失败: {e}", file=sys.stderr)
        sys.exit(1)


if __name__ == "__main__":
    main()
```

使用说明
- 准备 orders.csv 与 users.csv,列名为脚本中的规范列名(不区分大小写,脚本会统一小写)。
- 按需调整脚本顶部的配置(文件路径、目标时区、日期范围等)。
- 运行:python validate_integrity.py
- 输出:
  - 控制台打印汇总统计。
  - output/ 目录下输出每类问题的明细 CSV,例如:
    - orders__order_pk_duplicates.csv(订单主键重复)
    - fk__fk_orphan_orders.csv(外键不匹配的订单)
    - orders__channel_invalid_enum.csv(渠道枚举非法)
    - orders__order_date_unparseable.csv / users__signup_date_unparseable.csv(日期无法解析)
  - merged_sample.csv 为合并后前1000行样例,便于抽样核验。

校验要点与解释
- 主键/外键:订单表要求 order_id 非空且唯一;用户表要求 customer_id 非空且唯一;订单中的 customer_id 必须能在用户表找到。
- 金额:若有取值则必须为数值且 >= 0;空值允许(根据给定约束未要求非空)。
- 渠道:仅允许 web、app,区分大小写前会做标准化到小写。
- 货币:字段默认值为 CNY。若数据中出现空值,视为未正确落地默认,记为错误;非空取值不做枚举限制(不同币种是允许的)。另提供格式检查(3位大写)作为信息级提示。
- 日期与时区:
  - 纯日期字符串按日解析;
  - 含时区的时间戳先归一到 UTC 再转目标时区;
  - 无时区的时间戳按目标时区本地化;
  - 最终统一为“本地日期”以便跨表比对与范围校验;
  - 提供解析不可用与范围异常的明细输出。
- 重复:除主键重复外,额外输出整行重复记录,便于发现重复导出或重复摄入问题。
- 退出码:存在严重问题时进程以 2 退出,便于在 CI/调度中自动拦截。

示例2

Below is a self-contained PySpark script that validates the described dataset for deduplication, partition completeness, time-window bounds, enum legality, JSON structure, and required non-null fields. It produces two outputs: a “clean” deduplicated dataset and an “invalid” dataset with violation reasons. It also optionally fails if expected partitions are missing.

Assumptions:
- Input is a partitioned Parquet dataset with column names: event_id (string), event_time (timestamp), user_id (string), event_type (string), properties (string containing JSON), dt (date partition).
- Enum values are case-insensitive; we normalize to lowercase before validation.
- Required non-null fields: event_id, event_time, dt.
- JSON field “properties” is nullable. If present, it must be a valid JSON object. Required JSON keys are configurable.
- Time window rule: to_date(event_time) must be within [dt - late_lag_days, dt + future_drift_days] (inclusive).

You can adjust configuration via command-line arguments.

Python script (PySpark):

```python
#!/usr/bin/env python3
import argparse
import sys
from datetime import datetime, date, timedelta

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
    col, lower, trim, to_date, date_add, date_sub, expr, when, lit,
    array, array_union, array_except, size, from_json, map_keys, element_at,
    greatest, least, coalesce
)
from pyspark.sql.types import StringType, MapType

def parse_args():
    p = argparse.ArgumentParser(description="ETL integrity validation for event dataset")
    p.add_argument("--input-path", required=True, help="Input dataset path (Parquet), partitioned by dt")
    p.add_argument("--output-valid-path", required=True, help="Output path for valid, deduplicated dataset")
    p.add_argument("--output-invalid-path", required=True, help="Output path for invalid records with reasons")
    p.add_argument("--start-dt", required=True, help="Expected partition range start (YYYY-MM-DD)")
    p.add_argument("--end-dt", required=True, help="Expected partition range end (YYYY-MM-DD)")
    p.add_argument("--allowed-event-types", default="view,click,purchase", help="Comma-separated allowed event types")
    p.add_argument("--late-lag-days", type=int, default=1, help="Max allowed lateness (days) wrt dt")
    p.add_argument("--future-drift-days", type=int, default=0, help="Max allowed future drift (days) wrt dt")
    p.add_argument("--required-json-keys", default="", help="Comma-separated keys required within properties JSON (if properties is not null)")
    p.add_argument("--fail-on-partition-missing", action="store_true", help="Fail job if any expected dt partitions are missing")
    p.add_argument("--repartition", type=int, default=0, help="Optional number of partitions for output")
    return p.parse_args()

def daterange(start_dt: date, end_dt: date):
    d = start_dt
    out = []
    while d <= end_dt:
        out.append(d)
        d += timedelta(days=1)
    return out

def main():
    args = parse_args()

    start_dt = datetime.strptime(args.start_dt, "%Y-%m-%d").date()
    end_dt = datetime.strptime(args.end_dt, "%Y-%m-%d").date()
    if start_dt > end_dt:
        print("Error: start-dt must be <= end-dt", file=sys.stderr)
        sys.exit(2)

    allowed_event_types = [s.strip().lower() for s in args.allowed_event_types.split(",") if s.strip()]
    required_json_keys = [s.strip() for s in args.required_json_keys.split(",") if s.strip()]

    spark = (
        SparkSession.builder
        .appName("event-integrity-validation")
        .getOrCreate()
    )

    # Read input dataset
    df = spark.read.parquet(args.input_path)

    # Normalize types and values
    # - ensure event_type lowercase for enum validation
    # - ensure dt is date, event_time is timestamp
    df = df \
        .withColumn("event_type_norm", lower(trim(col("event_type")))) \
        .withColumn("dt", to_date(col("dt"))) \
        .withColumn("event_time", col("event_time").cast("timestamp")) \
        .withColumn("event_id", col("event_id").cast(StringType())) \
        .withColumn("user_id", col("user_id").cast(StringType())) \
        .withColumn("properties", col("properties").cast(StringType()))

    # Deduplicate by primary key event_id:
    # Keep the row with the latest event_time; in tie, keep the lexicographically max JSON to stabilize.
    # Mark dropped duplicates for invalid reporting.
    w = Window.partitionBy("event_id").orderBy(
        col("event_time").desc_nulls_last(),
        col("properties").desc_nulls_last()
    )
    df_with_rownum = df.withColumn("rn", expr("row_number() OVER (PARTITION BY event_id ORDER BY event_time DESC NULLS LAST, properties DESC NULLS LAST)"))
    canonical = df_with_rownum.filter(col("rn") == 1).drop("rn")
    duplicates_removed = df_with_rownum.filter(col("rn") > 1).drop("rn") \
        .withColumn("violations", array(lit("duplicate_removed")))

    # Core validations on canonical set

    # Required non-null fields
    required_viol = array()
    required_viol = when(col("event_id").isNull(), array_union(required_viol, array(lit("missing_required:event_id")))).otherwise(required_viol)
    required_viol = when(col("event_time").isNull(), array_union(required_viol, array(lit("missing_required:event_time")))).otherwise(required_viol)
    required_viol = when(col("dt").isNull(), array_union(required_viol, array(lit("missing_required:dt")))).otherwise(required_viol)

    # Enum validation (event_type cannot be null and must be in allowed set)
    enum_valid = col("event_type_norm").isin(allowed_event_types)
    enum_viol = when(col("event_type_norm").isNull(), array(lit("invalid_enum:null"))) \
        .otherwise(when(enum_valid, array()).otherwise(array(lit("invalid_enum:event_type"))))

    # Time window validation:
    # Allowed event_time date must be within [dt - late_lag_days, dt + future_drift_days]
    event_date = to_date(col("event_time"))
    min_allowed_date = date_sub(col("dt"), args.late_lag_days)
    max_allowed_date = date_add(col("dt"), args.future_drift_days)
    time_viol = when(
        col("event_time").isNull(), array(lit("time_window:event_time_null"))
    ).otherwise(
        when((event_date < min_allowed_date) | (event_date > max_allowed_date),
             array(lit("time_window:out_of_range"))).otherwise(array())
    )

    # JSON validation: properties is nullable; if present must be a valid JSON object.
    # We parse as Map<String,String> for structural validation. If parse fails => null.
    json_schema = MapType(StringType(), StringType(), containsNull=True)
    parsed_props = from_json(trim(col("properties")), json_schema)
    json_viol = when(
        col("properties").isNull(), array()
    ).otherwise(
        when(parsed_props.isNull(), array(lit("invalid_json:parse_error"))).otherwise(array())
    )

    # Required JSON keys (if provided): keys must exist and be non-empty when properties is not null
    if required_json_keys:
        # For each required key, check existence and non-empty value
        for k in required_json_keys:
            json_viol = when(
                col("properties").isNull(), json_viol  # no requirement if properties is null
            ).otherwise(
                when(
                    (parsed_props.isNull()) | (element_at(parsed_props, lit(k)).isNull()) | (trim(element_at(parsed_props, lit(k))) == ""),
                    array_union(json_viol, array(lit(f"missing_json_key:{k}")))
                ).otherwise(json_viol)
            )

    # Aggregate violations for canonical records
    violations = array()
    violations = array_union(violations, required_viol)
    violations = array_union(violations, enum_viol)
    violations = array_union(violations, time_viol)
    violations = array_union(violations, json_viol)

    canonical_with_viol = canonical.withColumn("violations", violations)

    # Split valid vs invalid
    invalid_canonical = canonical_with_viol.filter(size(col("violations")) > 0)
    valid_canonical = canonical_with_viol.filter(size(col("violations")) == 0).drop("violations")

    # Combine invalid sets (duplicates + invalid canonical)
    invalid_all = invalid_canonical.unionByName(duplicates_removed, allowMissingColumns=True)

    # Partition completeness check
    expected_dates = [d.strftime("%Y-%m-%d") for d in daterange(start_dt, end_dt)]
    actual_dates = [r["dt"].strftime("%Y-%m-%d") for r in df.select("dt").distinct().collect() if r["dt"] is not None]
    expected_set = set(expected_dates)
    actual_set = set(actual_dates)
    missing_partitions = sorted(expected_set - actual_set)

    # Write outputs
    if args.repartition > 0:
        valid_canonical = valid_canonical.repartition(args.repartition, col("dt"))
        invalid_all = invalid_all.repartition(args.repartition)

    # Write valid dataset partitioned by dt to preserve downstream partitioning
    valid_canonical.write.mode("overwrite").partitionBy("dt").parquet(args.output_valid_path)
    invalid_all.write.mode("overwrite").parquet(args.output_invalid_path)

    # Metrics and summary
    total_in = df.count()
    total_valid = valid_canonical.count()
    total_invalid = invalid_all.count()
    total_duplicates = duplicates_removed.count()

    print("Validation Summary")
    print(f"- Input rows: {total_in}")
    print(f"- Valid (deduplicated) rows: {total_valid}")
    print(f"- Invalid rows: {total_invalid}")
    print(f"- Duplicates removed: {total_duplicates}")
    print(f"- Expected dt partitions: {len(expected_set)} [{args.start_dt}..{args.end_dt}]")
    print(f"- Actual dt partitions: {len(actual_set)}")
    if missing_partitions:
        print(f"- Missing dt partitions ({len(missing_partitions)}): {', '.join(missing_partitions)}")
        if args.fail_on_partition_missing:
            print("Failing due to missing partitions.", file=sys.stderr)
            spark.stop()
            sys.exit(1)
    else:
        print("- Partition completeness: OK")

    spark.stop()

if __name__ == "__main__":
    main()
```

Usage example:
- Validate a dataset on s3 with 7-day completeness window, allowing 1 day late arrival and no future drift; require properties keys “item_id” and “price” when properties exists; fail if any partition missing.

```
python validate_events.py \
  --input-path s3://bucket/events_parquet/ \
  --output-valid-path s3://bucket/events_valid/ \
  --output-invalid-path s3://bucket/events_invalid/ \
  --start-dt 2025-09-20 \
  --end-dt 2025-09-26 \
  --allowed-event-types view,click,purchase \
  --late-lag-days 1 \
  --future-drift-days 0 \
  --required-json-keys item_id,price \
  --fail-on-partition-missing \
  --repartition 200
```

Notes:
- Deduplication retains a single canonical row per event_id (latest event_time). Removed duplicates are captured in the invalid output with reason “duplicate_removed”.
- The JSON structure check ensures the “properties” field, if present, parses into an object (map). If parsing fails, the record is flagged.
- Time-window validation operates on event_date relative to dt; adjust late_lag_days/future_drift_days to fit your pipeline’s allowed lateness.
- The script prints a concise summary and can hard-fail on missing partitions for strong SLA enforcement.

示例3

Below is a self-contained Python script that validates the integrity of a campaign reporting dataset. It checks schema conformance, funnel consistency (conversion <= click <= impression), date intervals, missing and anomalous values, channel enumeration, and currency consistency (if a currency column is present or specified).

You can save it as validate_campaign_report.py and run:
python validate_campaign_report.py --input path/to/data.csv --output path/to/validation_report.csv --currency-col currency --expected-currency USD

If no currency column exists or is not specified, the currency check is skipped with a warning.

```python
#!/usr/bin/env python3
"""
Validate campaign reporting dataset integrity.

Required columns:
- campaign_id: string (non-null)
- channel: enum in {"email", "ad", "app"}
- impression: int >= 0
- click: int >= 0
- conversion: int >= 0
- cost: decimal >= 0
- start_date: date
- end_date: date

Checks:
- Missing values in required fields
- Type and range: non-negative, integer for counts
- Funnel consistency: conversion <= click <= impression
- Date parsing and interval: start_date <= end_date
- Channel enumeration correctness
- Currency consistency (if currency column provided or detected):
    - Non-null currency values
    - Single unique currency or match expected currency

Outputs a CSV of row-level errors and prints a summary to stdout.
"""

import argparse
import sys
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple

REQUIRED_COLUMNS = [
    "campaign_id", "channel", "impression", "click",
    "conversion", "cost", "start_date", "end_date"
]
ALLOWED_CHANNELS = {"email", "ad", "app"}

def coerce_numeric(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
    """Return (parsed_numeric, is_valid_numeric) with NaN for invalid entries."""
    s = pd.to_numeric(df[col], errors="coerce")
    valid = ~s.isna()
    return s, valid

def coerce_date(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
    """Return (parsed_datetime, is_valid_date) with NaT for invalid entries."""
    s = pd.to_datetime(df[col], errors="coerce", utc=False, infer_datetime_format=True)
    valid = ~s.isna()
    return s, valid

def validate_campaign_report(
    df: pd.DataFrame,
    currency_col: Optional[str] = None,
    expected_currency: Optional[str] = None
) -> Dict:
    errors: List[Dict] = []

    # Normalize column names by stripping whitespace
    df = df.copy()
    df.columns = [c.strip() for c in df.columns]

    # Check presence of required columns
    missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
    if missing_cols:
        return {
            "fatal": True,
            "message": f"Missing required columns: {missing_cols}",
            "errors": pd.DataFrame(columns=["row_index", "campaign_id", "field", "code", "message"]),
            "summary": {}
        }

    n_rows = len(df)

    # Validate campaign_id non-null and non-empty
    cid_null = df["campaign_id"].isna() | (df["campaign_id"].astype(str).str.strip() == "")
    for idx in df.index[cid_null]:
        errors.append({
            "row_index": idx,
            "campaign_id": None if pd.isna(df.loc[idx, "campaign_id"]) else str(df.loc[idx, "campaign_id"]),
            "field": "campaign_id",
            "code": "missing",
            "message": "campaign_id is null or empty"
        })

    # Validate channel enumeration and missing
    ch_null = df["channel"].isna() | (df["channel"].astype(str).str.strip() == "")
    for idx in df.index[ch_null]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "channel",
            "code": "missing",
            "message": "channel is null or empty"
        })
    # allowed values
    ch_str = df["channel"].astype(str).str.strip().str.lower()
    ch_invalid = ~ch_str.isin(ALLOWED_CHANNELS)
    # Skip invalid flag for rows already missing
    ch_invalid = ch_invalid & ~ch_null
    for idx in df.index[ch_invalid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "channel",
            "code": "enum_violation",
            "message": f"channel '{df.loc[idx, 'channel']}' not in {sorted(ALLOWED_CHANNELS)}"
        })

    # Numeric columns: impression, click, conversion, cost
    numeric_cols = ["impression", "click", "conversion", "cost"]
    parsed_numeric = {}
    valid_numeric = {}

    for col in numeric_cols:
        s_parsed, s_valid = coerce_numeric(df, col)
        parsed_numeric[col] = s_parsed
        valid_numeric[col] = s_valid

        # Missing or non-numeric
        invalid_mask = ~s_valid
        for idx in df.index[invalid_mask]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": col,
                "code": "type_error",
                "message": f"{col} is not numeric"
            })

        # Non-negative constraint
        nonneg_mask = s_parsed < 0
        for idx in df.index[nonneg_mask & s_valid]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": col,
                "code": "range_violation",
                "message": f"{col} must be >= 0"
            })

        # Integer constraint for count columns
        if col in {"impression", "click", "conversion"}:
            # Allow NaN failures already recorded; check integer-ness for valid values
            int_mask = s_parsed.dropna().apply(lambda x: float(x).is_integer())
            int_mask = int_mask.reindex(df.index, fill_value=False)
            non_integer = ~int_mask & s_valid
            for idx in df.index[non_integer]:
                errors.append({
                    "row_index": idx,
                    "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                    "field": col,
                    "code": "type_error",
                    "message": f"{col} must be an integer count"
                })

    # Funnel consistency: conversion <= click <= impression
    conv = parsed_numeric["conversion"]
    clk = parsed_numeric["click"]
    imp = parsed_numeric["impression"]
    # Only evaluate on rows where all three are valid and non-negative integers
    funnel_valid = valid_numeric["conversion"] & valid_numeric["click"] & valid_numeric["impression"]
    # Apply comparison
    funnel_violation = funnel_valid & ~((conv <= clk) & (clk <= imp))
    for idx in df.index[funnel_violation]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "funnel",
            "code": "consistency_error",
            "message": f"conversion({conv.loc[idx]}) <= click({clk.loc[idx]}) <= impression({imp.loc[idx]}) violated"
        })

    # Date columns
    start_parsed, start_valid = coerce_date(df, "start_date")
    end_parsed, end_valid = coerce_date(df, "end_date")

    for idx in df.index[~start_valid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "start_date",
            "code": "type_error",
            "message": "start_date is invalid or cannot be parsed"
        })
    for idx in df.index[~end_valid]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "end_date",
            "code": "type_error",
            "message": "end_date is invalid or cannot be parsed"
        })

    # Date interval: start_date <= end_date
    date_interval_valid = start_valid & end_valid
    interval_violation = date_interval_valid & (start_parsed > end_parsed)
    for idx in df.index[interval_violation]:
        errors.append({
            "row_index": idx,
            "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
            "field": "date_range",
            "code": "range_violation",
            "message": f"start_date({start_parsed.loc[idx].date()}) > end_date({end_parsed.loc[idx].date()})"
        })

    # Currency consistency (optional)
    currency_check_status = "skipped"
    currency_unique_values = None
    if currency_col is None:
        # Try to auto-detect common currency column names
        for candidate in ["currency", "cost_currency"]:
            if candidate in df.columns:
                currency_col = candidate
                break

    if currency_col is not None and currency_col in df.columns:
        currency_check_status = "performed"
        cur_series = df[currency_col].astype(str).str.strip()
        cur_missing = cur_series.isna() | (cur_series == "") | (cur_series.str.lower() == "nan")
        for idx in df.index[cur_missing]:
            errors.append({
                "row_index": idx,
                "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                "field": currency_col,
                "code": "missing",
                "message": f"{currency_col} is null or empty"
            })

        currency_unique_values = sorted(cur_series[~cur_missing].unique().tolist())
        if expected_currency is not None:
            mism = cur_series[~cur_missing] != expected_currency
            for idx in df.index[mism]:
                errors.append({
                    "row_index": idx,
                    "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                    "field": currency_col,
                    "code": "consistency_error",
                    "message": f"Currency '{cur_series.loc[idx]}' != expected '{expected_currency}'"
                })
        else:
            # Require single currency across dataset
            if len(set(cur_series[~cur_missing])) > 1:
                # Report a dataset-level inconsistency with per-row notes
                for idx in df.index[~cur_missing]:
                    # Only flag rows that contribute to inconsistency (i.e., not the modal value)
                    # For simplicity, flag all non-missing rows when multiple currencies exist
                    errors.append({
                        "row_index": idx,
                        "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
                        "field": currency_col,
                        "code": "consistency_error",
                        "message": f"Multiple currencies present in dataset: {currency_unique_values}"
                    })
    else:
        currency_check_status = "skipped_no_column"

    # Build errors DataFrame
    errors_df = pd.DataFrame(errors, columns=["row_index", "campaign_id", "field", "code", "message"])
    errors_df.sort_values(by=["row_index", "field"], inplace=True)

    # Summary metrics
    summary = {
        "row_count": int(n_rows),
        "error_count": int(len(errors_df)),
        "missing_campaign_id": int(((df["campaign_id"].isna()) | (df["campaign_id"].astype(str).str.strip() == "")).sum()),
        "invalid_channel": int(ch_invalid.sum()),
        "missing_channel": int(ch_null.sum()),
        "invalid_impression": int((~valid_numeric["impression"]).sum()),
        "invalid_click": int((~valid_numeric["click"]).sum()),
        "invalid_conversion": int((~valid_numeric["conversion"]).sum()),
        "invalid_cost": int((~valid_numeric["cost"]).sum()),
        "negative_impression": int((parsed_numeric["impression"] < 0).sum()),
        "negative_click": int((parsed_numeric["click"] < 0).sum()),
        "negative_conversion": int((parsed_numeric["conversion"] < 0).sum()),
        "negative_cost": int((parsed_numeric["cost"] < 0).sum()),
        "funnel_inconsistency": int(funnel_violation.sum()),
        "invalid_start_date": int((~start_valid).sum()),
        "invalid_end_date": int((~end_valid).sum()),
        "date_range_violation": int(interval_violation.sum()),
        "currency_check_status": currency_check_status,
        "currency_unique_values": currency_unique_values if currency_unique_values is not None else []
    }

    return {
        "fatal": False,
        "message": "Validation completed",
        "errors": errors_df,
        "summary": summary
    }

def main():
    parser = argparse.ArgumentParser(description="Validate campaign report dataset integrity.")
    parser.add_argument("--input", required=True, help="Path to input CSV file")
    parser.add_argument("--output", required=False, help="Path to output CSV file for row-level errors")
    parser.add_argument("--currency-col", required=False, help="Currency column name (e.g., 'currency')")
    parser.add_argument("--expected-currency", required=False, help="Expected currency code (e.g., 'USD')")
    parser.add_argument("--delimiter", required=False, default=",", help="CSV delimiter (default ',')")
    parser.add_argument("--encoding", required=False, default="utf-8", help="File encoding (default 'utf-8')")
    args = parser.parse_args()

    try:
        df = pd.read_csv(args.input, delimiter=args.delimiter, encoding=args.encoding)
    except Exception as e:
        print(f"Failed to read input file: {e}", file=sys.stderr)
        sys.exit(2)

    result = validate_campaign_report(df, currency_col=args.currency_col, expected_currency=args.expected_currency)

    if result.get("fatal", False):
        print(f"Fatal error: {result.get('message')}", file=sys.stderr)
        sys.exit(1)

    # Print summary
    summary = result["summary"]
    print("Validation Summary")
    print(f"Rows: {summary['row_count']}")
    print(f"Total errors: {summary['error_count']}")
    print(f"Missing campaign_id: {summary['missing_campaign_id']}")
    print(f"Missing channel: {summary['missing_channel']}")
    print(f"Invalid channel enum: {summary['invalid_channel']}")
    print(f"Invalid impression: {summary['invalid_impression']} | Negative impression: {summary['negative_impression']}")
    print(f"Invalid click: {summary['invalid_click']} | Negative click: {summary['negative_click']}")
    print(f"Invalid conversion: {summary['invalid_conversion']} | Negative conversion: {summary['negative_conversion']}")
    print(f"Invalid cost: {summary['invalid_cost']} | Negative cost: {summary['negative_cost']}")
    print(f"Funnel inconsistencies: {summary['funnel_inconsistency']}")
    print(f"Invalid start_date: {summary['invalid_start_date']}")
    print(f"Invalid end_date: {summary['invalid_end_date']}")
    print(f"Date range violations: {summary['date_range_violation']}")
    print(f"Currency check: {summary['currency_check_status']}")
    if summary["currency_unique_values"]:
        print(f"Currencies detected: {summary['currency_unique_values']}")

    # Write per-row errors if requested
    if args.output:
        try:
            result["errors"].to_csv(args.output, index=False)
            print(f"Row-level errors written to {args.output}")
        except Exception as e:
            print(f"Failed to write output errors CSV: {e}", file=sys.stderr)
            sys.exit(3)

if __name__ == "__main__":
    main()
```

适用用户

数据分析师

在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。

数据工程师

在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。

BI与运营团队

活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。

机器学习工程师

训练前检查样本一致性、标签异常与分布偏差,输出处理建议,提升模型稳定与上线成功率,避免数据偏差影响效果。

合规与审计人员

自动生成可追溯的检查报告与记录,满足合规审查要求,快速定位问题环节,降低数据风险暴露与审计成本。

数据质量负责人

建立标准化校验清单并批量执行,持续监测健康度,推动各域数据达标与改进闭环,形成数据质量治理机制。

解决的问题

把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。

特征总结

一键生成数据完整性校验脚本,按数据集特征自动定制规则与阈值
自动识别缺失、重复与异常值,给出修复建议与示例代码,降低清洗成本
按业务字段定义必填、唯一与取值范围,快速定位问题列与具体问题记录
可设置容忍度与告警级别,生成可读报告,让数据质量与团队标准对齐
适配多种数据表结构与场景,灵活扩展校验项,省去繁琐手写脚本
结合上下文解释校验结果,指出业务风险影响与下一步处理路径
模板化参数输入,团队可复用,缩短从需求到上线的验证与交付时间
无缝嵌入日常分析流程,运行即输出检查结论与可执行修复清单
输出结构化、清晰的说明与报告,便于审计留痕与跨团队沟通协作

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥20.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 252 tokens
- 2 个可调节参数
{ 数据集特征 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59