生成用于验证数据完整性的Python脚本,精准高效。
下面给出一份可直接运行的 Python 脚本,使用 pandas 对“订单表 + 用户表”进行数据完整性验证,并输出问题明细与汇总。覆盖的检查项包括:主键/外键一致性、金额范围、渠道枚举、日期合法与时区归一、缺失与重复值等。 说明 - 输入:两个 CSV 文件(orders.csv, users.csv),也可在配置区修改路径。 - 输出:控制台汇总报告 + 多个问题明细 CSV 到 output/ 目录。 - 合并:对订单左连接用户,检出孤儿订单(外键不匹配)。 - 时区:将包含时间或时区的时间戳统一归一到 Asia/Shanghai 后取日期;纯日期字符串不做时区偏移。 脚本 ```python #!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import sys from datetime import date, datetime, timedelta import numpy as np import pandas as pd import pytz ############################ # 配置 ############################ ORDERS_CSV = "orders.csv" # 订单表CSV路径,要求包含列:order_id, order_date, customer_id, amount, channel, currency USERS_CSV = "users.csv" # 用户表CSV路径,要求包含列:customer_id, signup_date, region OUTPUT_DIR = "output" # 问题明细输出目录 TARGET_TZ = "Asia/Shanghai" # 日期时间归一的目标时区 ALLOWED_CHANNELS = {"web", "app"} # 渠道枚举 # 日期合理范围(可按需调整) DATE_MIN = pd.to_datetime("1970-01-01").date() DATE_MAX = (datetime.now(pytz.timezone(TARGET_TZ)) + timedelta(days=365)).date() # 未来一年的容忍 ############################ # 工具函数 ############################ def ensure_output_dir(path: str): os.makedirs(path, exist_ok=True) def read_csv_as_strings(csv_path: str, required_cols: set): if not os.path.exists(csv_path): raise FileNotFoundError(f"未找到文件: {csv_path}") # 统一以字符串读取,避免前导零丢失/科学计数等问题;后续再做严格类型校验 df = pd.read_csv(csv_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "NULL"]) # 统一列名小写 df.columns = [c.strip().lower() for c in df.columns] missing = required_cols - set(df.columns) if missing: raise ValueError(f"文件 {csv_path} 缺少必需列: {missing}") # 去除字符串首尾空白 for c in df.columns: if pd.api.types.is_string_dtype(df[c]): df[c] = df[c].str.strip() return df def to_nullable_int(series: pd.Series): # 允许空值;验证为整数(禁止小数) s = series.copy() # 去掉逗号/空格 s = s.str.replace(",", "", regex=False) # 合法整数(可含符号),用正则先行筛查,避免 1.0 之类误判 mask_nonempty = s.notna() & (s != "") mask_intlike = s.str.match(r"^[+-]?\d+$", na=False) out = pd.Series(pd.NA, index=s.index, dtype="Int64") out.loc[mask_nonempty & mask_intlike] = pd.to_numeric(s[mask_nonempty & mask_intlike], errors="coerce").astype("Int64") return out def to_nullable_decimal(series: pd.Series): s = series.copy() s = s.str.replace(",", "", regex=False) # 允许小数;返回float64并保留NaN out = pd.to_numeric(s, errors="coerce") return out def normalize_channel(series: pd.Series): s = series.str.lower().str.strip() return s def parse_to_local_date(series: pd.Series, target_tz: str): """ 规则: - 纯日期(YYYY-MM-DD)直接解析为日期; - 含时间戳: * 含时区偏移/标记:转为UTC再转换到目标时区,最后取date; * 无时区:按目标时区本地化,再取date。 返回: - dates: pd.Series of python date (或 NaT),dtype=object - meta: dict,包含计数统计 """ tz = pytz.timezone(target_tz) raw = series.astype("string").str.strip() mask_empty = raw.isna() | (raw == "") mask_date_only = raw.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False) mask_has_tz = raw.str.contains(r"(Z|[+-]\d{2}:\d{2}|[+-]\d{4}|UTC|GMT)", case=False, na=False) mask_timestamp = ~mask_empty & ~mask_date_only dates = pd.Series(pd.NaT, index=raw.index, dtype="object") # 保存为 python date 或 NaT # 纯日期 if mask_date_only.any(): d = pd.to_datetime(raw[mask_date_only], format="%Y-%m-%d", errors="coerce") dates.loc[mask_date_only] = d.dt.date # 含时区的时间戳 mask_ts_tz = mask_timestamp & mask_has_tz if mask_ts_tz.any(): dt_utc = pd.to_datetime(raw[mask_ts_tz], errors="coerce", utc=True) dt_local = dt_utc.dt.tz_convert(tz) dates.loc[mask_ts_tz] = dt_local.dt.date # 无时区的时间戳 -> 本地化到目标时区 mask_ts_naive = mask_timestamp & (~mask_has_tz) if mask_ts_naive.any(): dt_naive = pd.to_datetime(raw[mask_ts_naive], errors="coerce", utc=False) try: dt_localized = dt_naive.dt.tz_localize(tz) except TypeError: # 若 dt_naive 已带tz(极端混合情况),退化处理 dt_localized = dt_naive dt_localized = dt_localized.dt.tz_convert(tz) dates.loc[mask_ts_naive] = dt_localized.dt.date meta = { "empty": int(mask_empty.sum()), "date_only": int(mask_date_only.sum()), "timestamp_with_tz": int(mask_ts_tz.sum()), "timestamp_naive": int(mask_ts_naive.sum()), "parsed_na": int(pd.isna(dates).sum()) } return dates, meta def regex_iso_currency(series: pd.Series): # 简单校验货币代码格式(ISO-4217 形态):3位大写字母 s = series.astype("string") return s.str.fullmatch(r"[A-Z]{3}", na=False) ############################ # 验证逻辑 ############################ def validate_orders(df_orders: pd.DataFrame): issues = {} # 1) 主键与必填约束 df = df_orders.copy() df["order_id_int"] = to_nullable_int(df["order_id"]) issues["order_id_null"] = df[df["order_id_int"].isna()][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]] # 主键重复 dup_order = df["order_id_int"].duplicated(keep=False) & df["order_id_int"].notna() issues["order_pk_duplicates"] = df[dup_order][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]] # 2) 非空约束:customer_id issues["order_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["order_id", "customer_id"]] # 3) 金额 decimal >= 0 df["amount_num"] = to_nullable_decimal(df["amount"]) issues["amount_not_numeric"] = df[df["amount"].notna() & (df["amount"] != "") & df["amount_num"].isna()][["order_id", "amount"]] issues["amount_negative"] = df[df["amount_num"].notna() & (df["amount_num"] < 0)][["order_id", "amount", "amount_num"]] # 4) 渠道枚举 df["channel_norm"] = normalize_channel(df["channel"]) issues["channel_null"] = df[df["channel_norm"].isna() | (df["channel_norm"] == "")][["order_id", "channel"]] issues["channel_invalid_enum"] = df[df["channel_norm"].notna() & (df["channel_norm"] != "") & (~df["channel_norm"].isin(ALLOWED_CHANNELS))][["order_id", "channel"]] # 5) 货币默认/格式(默认CNY:若为空,视为未正确应用默认;非空不限定取值) issues["currency_missing_should_default"] = df[df["currency"].isna() | (df["currency"] == "")][["order_id", "currency"]] # 可选:检查格式是否符合3位大写(信息级) fmt_mask = regex_iso_currency(df["currency"]) issues["currency_format_non_iso_like_info"] = df[(df["currency"].notna()) & (df["currency"] != "") & (~fmt_mask)][["order_id", "currency"]] # 6) 日期合法与时区归一 order_dates, meta_order = parse_to_local_date(df["order_date"], TARGET_TZ) df["order_date_norm"] = order_dates issues["order_date_unparseable"] = df[pd.isna(df["order_date_norm"]) & df["order_date"].notna() & (df["order_date"] != "")][["order_id", "order_date"]] # 日期范围 mask_order_range_invalid = (~pd.isna(df["order_date_norm"])) & ((df["order_date_norm"] < DATE_MIN) | (df["order_date_norm"] > DATE_MAX)) issues["order_date_out_of_range"] = df[mask_order_range_invalid][["order_id", "order_date", "order_date_norm"]] # 7) 订单表全行重复(可能来自重复导出) issues["order_fullrow_duplicates"] = df[df.duplicated(subset=["order_id", "order_date", "customer_id", "amount", "channel", "currency"], keep=False)] return df, issues, {"order_date_meta": meta_order} def validate_users(df_users: pd.DataFrame): issues = {} df = df_users.copy() # 1) 主键与必填约束 issues["user_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["customer_id", "signup_date", "region"]] dup_user = df["customer_id"].duplicated(keep=False) & df["customer_id"].notna() & (df["customer_id"] != "") issues["user_pk_duplicates"] = df[dup_user][["customer_id", "signup_date", "region"]] # 2) 日期合法与时区归一 signup_dates, meta_user = parse_to_local_date(df["signup_date"], TARGET_TZ) df["signup_date_norm"] = signup_dates issues["signup_date_unparseable"] = df[pd.isna(df["signup_date_norm"]) & df["signup_date"].notna() & (df["signup_date"] != "")][["customer_id", "signup_date"]] mask_signup_range_invalid = (~pd.isna(df["signup_date_norm"])) & ((df["signup_date_norm"] < DATE_MIN) | (df["signup_date_norm"] > DATE_MAX)) issues["signup_date_out_of_range"] = df[mask_signup_range_invalid][["customer_id", "signup_date", "signup_date_norm"]] return df, issues, {"signup_date_meta": meta_user} def check_foreign_keys(df_orders_norm: pd.DataFrame, df_users_norm: pd.DataFrame): # 左连接检出订单孤儿(外键不匹配) merged = df_orders_norm.merge( df_users_norm[["customer_id", "signup_date_norm", "region"]], on="customer_id", how="left", indicator=True, suffixes=("", "_user") ) orphan = merged[merged["_merge"] == "left_only"] # 可选的业务一致性:下单日期 >= 注册日期(若两者都有效) has_both = merged["order_date_norm"].notna() & merged["signup_date_norm"].notna() violates_chronology = merged[has_both & (merged["order_date_norm"] < merged["signup_date_norm"])] fk_issues = { "fk_orphan_orders": orphan[["order_id", "customer_id", "order_date_norm", "signup_date_norm", "region"]], "order_before_signup": violates_chronology[["order_id", "customer_id", "order_date_norm", "signup_date_norm"]] } return merged, fk_issues def save_issues(issues: dict, prefix: str): ensure_output_dir(OUTPUT_DIR) for name, df in issues.items(): if df is None or len(df) == 0: continue out_path = os.path.join(OUTPUT_DIR, f"{prefix}__{name}.csv") df.to_csv(out_path, index=False, encoding="utf-8-sig") def print_summary(total_orders: int, total_users: int, issues_all: dict, metas: dict): print("=== 数据完整性验证汇总 ===") print(f"- 订单表记录数: {total_orders}") print(f"- 用户表记录数: {total_users}") # 日期解析元信息 if "order_date_meta" in metas: m = metas["order_date_meta"] print(f"- 订单日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}") if "signup_date_meta" in metas: m = metas["signup_date_meta"] print(f"- 注册日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}") # 统计各类问题计数 print("\n=== 问题计数 ===") for section, sec_issues in issues_all.items(): for name, df in sec_issues.items(): cnt = 0 if df is None else len(df) print(f"[{section}] {name}: {cnt}") print(f"\n明细已输出到目录: {OUTPUT_DIR}\n") def main(): try: # 读取 orders_req = {"order_id", "order_date", "customer_id", "amount", "channel", "currency"} users_req = {"customer_id", "signup_date", "region"} df_orders_raw = read_csv_as_strings(ORDERS_CSV, orders_req) df_users_raw = read_csv_as_strings(USERS_CSV, users_req) # 验证各自表 df_orders_norm, order_issues, order_meta = validate_orders(df_orders_raw) df_users_norm, user_issues, user_meta = validate_users(df_users_raw) # 外键与合并 merged, fk_issues = check_foreign_keys(df_orders_norm, df_users_norm) # 汇总与输出 save_issues(order_issues, "orders") save_issues(user_issues, "users") save_issues(fk_issues, "fk") # 输出已归一化(合并后)样例(可选) sample_out = os.path.join(OUTPUT_DIR, "merged_sample.csv") merged.head(1000).to_csv(sample_out, index=False, encoding="utf-8-sig") # 汇总打印 issues_all = {"orders": order_issues, "users": user_issues, "merge_fk": fk_issues} metas = {"order_date_meta": order_meta["order_date_meta"], "signup_date_meta": user_meta["signup_date_meta"]} print_summary(len(df_orders_raw), len(df_users_raw), issues_all, metas) # 以退出码标识是否存在“严重问题” severe_counts = 0 severe_counts += len(order_issues["order_id_null"]) severe_counts += len(order_issues["order_pk_duplicates"]) severe_counts += len(order_issues["order_customer_id_null"]) severe_counts += len(order_issues["amount_negative"]) severe_counts += len(order_issues["channel_null"]) + len(order_issues["channel_invalid_enum"]) severe_counts += len(order_issues["order_date_unparseable"]) severe_counts += len(user_issues["user_customer_id_null"]) severe_counts += len(user_issues["user_pk_duplicates"]) severe_counts += len(user_issues["signup_date_unparseable"]) severe_counts += len(fk_issues["fk_orphan_orders"]) if severe_counts > 0: sys.exit(2) else: sys.exit(0) except Exception as e: print(f"运行失败: {e}", file=sys.stderr) sys.exit(1) if __name__ == "__main__": main() ``` 使用说明 - 准备 orders.csv 与 users.csv,列名为脚本中的规范列名(不区分大小写,脚本会统一小写)。 - 按需调整脚本顶部的配置(文件路径、目标时区、日期范围等)。 - 运行:python validate_integrity.py - 输出: - 控制台打印汇总统计。 - output/ 目录下输出每类问题的明细 CSV,例如: - orders__order_pk_duplicates.csv(订单主键重复) - fk__fk_orphan_orders.csv(外键不匹配的订单) - orders__channel_invalid_enum.csv(渠道枚举非法) - orders__order_date_unparseable.csv / users__signup_date_unparseable.csv(日期无法解析) - merged_sample.csv 为合并后前1000行样例,便于抽样核验。 校验要点与解释 - 主键/外键:订单表要求 order_id 非空且唯一;用户表要求 customer_id 非空且唯一;订单中的 customer_id 必须能在用户表找到。 - 金额:若有取值则必须为数值且 >= 0;空值允许(根据给定约束未要求非空)。 - 渠道:仅允许 web、app,区分大小写前会做标准化到小写。 - 货币:字段默认值为 CNY。若数据中出现空值,视为未正确落地默认,记为错误;非空取值不做枚举限制(不同币种是允许的)。另提供格式检查(3位大写)作为信息级提示。 - 日期与时区: - 纯日期字符串按日解析; - 含时区的时间戳先归一到 UTC 再转目标时区; - 无时区的时间戳按目标时区本地化; - 最终统一为“本地日期”以便跨表比对与范围校验; - 提供解析不可用与范围异常的明细输出。 - 重复:除主键重复外,额外输出整行重复记录,便于发现重复导出或重复摄入问题。 - 退出码:存在严重问题时进程以 2 退出,便于在 CI/调度中自动拦截。
Below is a self-contained PySpark script that validates the described dataset for deduplication, partition completeness, time-window bounds, enum legality, JSON structure, and required non-null fields. It produces two outputs: a “clean” deduplicated dataset and an “invalid” dataset with violation reasons. It also optionally fails if expected partitions are missing. Assumptions: - Input is a partitioned Parquet dataset with column names: event_id (string), event_time (timestamp), user_id (string), event_type (string), properties (string containing JSON), dt (date partition). - Enum values are case-insensitive; we normalize to lowercase before validation. - Required non-null fields: event_id, event_time, dt. - JSON field “properties” is nullable. If present, it must be a valid JSON object. Required JSON keys are configurable. - Time window rule: to_date(event_time) must be within [dt - late_lag_days, dt + future_drift_days] (inclusive). You can adjust configuration via command-line arguments. Python script (PySpark): ```python #!/usr/bin/env python3 import argparse import sys from datetime import datetime, date, timedelta from pyspark.sql import SparkSession, Window from pyspark.sql.functions import ( col, lower, trim, to_date, date_add, date_sub, expr, when, lit, array, array_union, array_except, size, from_json, map_keys, element_at, greatest, least, coalesce ) from pyspark.sql.types import StringType, MapType def parse_args(): p = argparse.ArgumentParser(description="ETL integrity validation for event dataset") p.add_argument("--input-path", required=True, help="Input dataset path (Parquet), partitioned by dt") p.add_argument("--output-valid-path", required=True, help="Output path for valid, deduplicated dataset") p.add_argument("--output-invalid-path", required=True, help="Output path for invalid records with reasons") p.add_argument("--start-dt", required=True, help="Expected partition range start (YYYY-MM-DD)") p.add_argument("--end-dt", required=True, help="Expected partition range end (YYYY-MM-DD)") p.add_argument("--allowed-event-types", default="view,click,purchase", help="Comma-separated allowed event types") p.add_argument("--late-lag-days", type=int, default=1, help="Max allowed lateness (days) wrt dt") p.add_argument("--future-drift-days", type=int, default=0, help="Max allowed future drift (days) wrt dt") p.add_argument("--required-json-keys", default="", help="Comma-separated keys required within properties JSON (if properties is not null)") p.add_argument("--fail-on-partition-missing", action="store_true", help="Fail job if any expected dt partitions are missing") p.add_argument("--repartition", type=int, default=0, help="Optional number of partitions for output") return p.parse_args() def daterange(start_dt: date, end_dt: date): d = start_dt out = [] while d <= end_dt: out.append(d) d += timedelta(days=1) return out def main(): args = parse_args() start_dt = datetime.strptime(args.start_dt, "%Y-%m-%d").date() end_dt = datetime.strptime(args.end_dt, "%Y-%m-%d").date() if start_dt > end_dt: print("Error: start-dt must be <= end-dt", file=sys.stderr) sys.exit(2) allowed_event_types = [s.strip().lower() for s in args.allowed_event_types.split(",") if s.strip()] required_json_keys = [s.strip() for s in args.required_json_keys.split(",") if s.strip()] spark = ( SparkSession.builder .appName("event-integrity-validation") .getOrCreate() ) # Read input dataset df = spark.read.parquet(args.input_path) # Normalize types and values # - ensure event_type lowercase for enum validation # - ensure dt is date, event_time is timestamp df = df \ .withColumn("event_type_norm", lower(trim(col("event_type")))) \ .withColumn("dt", to_date(col("dt"))) \ .withColumn("event_time", col("event_time").cast("timestamp")) \ .withColumn("event_id", col("event_id").cast(StringType())) \ .withColumn("user_id", col("user_id").cast(StringType())) \ .withColumn("properties", col("properties").cast(StringType())) # Deduplicate by primary key event_id: # Keep the row with the latest event_time; in tie, keep the lexicographically max JSON to stabilize. # Mark dropped duplicates for invalid reporting. w = Window.partitionBy("event_id").orderBy( col("event_time").desc_nulls_last(), col("properties").desc_nulls_last() ) df_with_rownum = df.withColumn("rn", expr("row_number() OVER (PARTITION BY event_id ORDER BY event_time DESC NULLS LAST, properties DESC NULLS LAST)")) canonical = df_with_rownum.filter(col("rn") == 1).drop("rn") duplicates_removed = df_with_rownum.filter(col("rn") > 1).drop("rn") \ .withColumn("violations", array(lit("duplicate_removed"))) # Core validations on canonical set # Required non-null fields required_viol = array() required_viol = when(col("event_id").isNull(), array_union(required_viol, array(lit("missing_required:event_id")))).otherwise(required_viol) required_viol = when(col("event_time").isNull(), array_union(required_viol, array(lit("missing_required:event_time")))).otherwise(required_viol) required_viol = when(col("dt").isNull(), array_union(required_viol, array(lit("missing_required:dt")))).otherwise(required_viol) # Enum validation (event_type cannot be null and must be in allowed set) enum_valid = col("event_type_norm").isin(allowed_event_types) enum_viol = when(col("event_type_norm").isNull(), array(lit("invalid_enum:null"))) \ .otherwise(when(enum_valid, array()).otherwise(array(lit("invalid_enum:event_type")))) # Time window validation: # Allowed event_time date must be within [dt - late_lag_days, dt + future_drift_days] event_date = to_date(col("event_time")) min_allowed_date = date_sub(col("dt"), args.late_lag_days) max_allowed_date = date_add(col("dt"), args.future_drift_days) time_viol = when( col("event_time").isNull(), array(lit("time_window:event_time_null")) ).otherwise( when((event_date < min_allowed_date) | (event_date > max_allowed_date), array(lit("time_window:out_of_range"))).otherwise(array()) ) # JSON validation: properties is nullable; if present must be a valid JSON object. # We parse as Map<String,String> for structural validation. If parse fails => null. json_schema = MapType(StringType(), StringType(), containsNull=True) parsed_props = from_json(trim(col("properties")), json_schema) json_viol = when( col("properties").isNull(), array() ).otherwise( when(parsed_props.isNull(), array(lit("invalid_json:parse_error"))).otherwise(array()) ) # Required JSON keys (if provided): keys must exist and be non-empty when properties is not null if required_json_keys: # For each required key, check existence and non-empty value for k in required_json_keys: json_viol = when( col("properties").isNull(), json_viol # no requirement if properties is null ).otherwise( when( (parsed_props.isNull()) | (element_at(parsed_props, lit(k)).isNull()) | (trim(element_at(parsed_props, lit(k))) == ""), array_union(json_viol, array(lit(f"missing_json_key:{k}"))) ).otherwise(json_viol) ) # Aggregate violations for canonical records violations = array() violations = array_union(violations, required_viol) violations = array_union(violations, enum_viol) violations = array_union(violations, time_viol) violations = array_union(violations, json_viol) canonical_with_viol = canonical.withColumn("violations", violations) # Split valid vs invalid invalid_canonical = canonical_with_viol.filter(size(col("violations")) > 0) valid_canonical = canonical_with_viol.filter(size(col("violations")) == 0).drop("violations") # Combine invalid sets (duplicates + invalid canonical) invalid_all = invalid_canonical.unionByName(duplicates_removed, allowMissingColumns=True) # Partition completeness check expected_dates = [d.strftime("%Y-%m-%d") for d in daterange(start_dt, end_dt)] actual_dates = [r["dt"].strftime("%Y-%m-%d") for r in df.select("dt").distinct().collect() if r["dt"] is not None] expected_set = set(expected_dates) actual_set = set(actual_dates) missing_partitions = sorted(expected_set - actual_set) # Write outputs if args.repartition > 0: valid_canonical = valid_canonical.repartition(args.repartition, col("dt")) invalid_all = invalid_all.repartition(args.repartition) # Write valid dataset partitioned by dt to preserve downstream partitioning valid_canonical.write.mode("overwrite").partitionBy("dt").parquet(args.output_valid_path) invalid_all.write.mode("overwrite").parquet(args.output_invalid_path) # Metrics and summary total_in = df.count() total_valid = valid_canonical.count() total_invalid = invalid_all.count() total_duplicates = duplicates_removed.count() print("Validation Summary") print(f"- Input rows: {total_in}") print(f"- Valid (deduplicated) rows: {total_valid}") print(f"- Invalid rows: {total_invalid}") print(f"- Duplicates removed: {total_duplicates}") print(f"- Expected dt partitions: {len(expected_set)} [{args.start_dt}..{args.end_dt}]") print(f"- Actual dt partitions: {len(actual_set)}") if missing_partitions: print(f"- Missing dt partitions ({len(missing_partitions)}): {', '.join(missing_partitions)}") if args.fail_on_partition_missing: print("Failing due to missing partitions.", file=sys.stderr) spark.stop() sys.exit(1) else: print("- Partition completeness: OK") spark.stop() if __name__ == "__main__": main() ``` Usage example: - Validate a dataset on s3 with 7-day completeness window, allowing 1 day late arrival and no future drift; require properties keys “item_id” and “price” when properties exists; fail if any partition missing. ``` python validate_events.py \ --input-path s3://bucket/events_parquet/ \ --output-valid-path s3://bucket/events_valid/ \ --output-invalid-path s3://bucket/events_invalid/ \ --start-dt 2025-09-20 \ --end-dt 2025-09-26 \ --allowed-event-types view,click,purchase \ --late-lag-days 1 \ --future-drift-days 0 \ --required-json-keys item_id,price \ --fail-on-partition-missing \ --repartition 200 ``` Notes: - Deduplication retains a single canonical row per event_id (latest event_time). Removed duplicates are captured in the invalid output with reason “duplicate_removed”. - The JSON structure check ensures the “properties” field, if present, parses into an object (map). If parsing fails, the record is flagged. - Time-window validation operates on event_date relative to dt; adjust late_lag_days/future_drift_days to fit your pipeline’s allowed lateness. - The script prints a concise summary and can hard-fail on missing partitions for strong SLA enforcement.
Below is a self-contained Python script that validates the integrity of a campaign reporting dataset. It checks schema conformance, funnel consistency (conversion <= click <= impression), date intervals, missing and anomalous values, channel enumeration, and currency consistency (if a currency column is present or specified). You can save it as validate_campaign_report.py and run: python validate_campaign_report.py --input path/to/data.csv --output path/to/validation_report.csv --currency-col currency --expected-currency USD If no currency column exists or is not specified, the currency check is skipped with a warning. ```python #!/usr/bin/env python3 """ Validate campaign reporting dataset integrity. Required columns: - campaign_id: string (non-null) - channel: enum in {"email", "ad", "app"} - impression: int >= 0 - click: int >= 0 - conversion: int >= 0 - cost: decimal >= 0 - start_date: date - end_date: date Checks: - Missing values in required fields - Type and range: non-negative, integer for counts - Funnel consistency: conversion <= click <= impression - Date parsing and interval: start_date <= end_date - Channel enumeration correctness - Currency consistency (if currency column provided or detected): - Non-null currency values - Single unique currency or match expected currency Outputs a CSV of row-level errors and prints a summary to stdout. """ import argparse import sys import pandas as pd import numpy as np from typing import Dict, List, Optional, Tuple REQUIRED_COLUMNS = [ "campaign_id", "channel", "impression", "click", "conversion", "cost", "start_date", "end_date" ] ALLOWED_CHANNELS = {"email", "ad", "app"} def coerce_numeric(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]: """Return (parsed_numeric, is_valid_numeric) with NaN for invalid entries.""" s = pd.to_numeric(df[col], errors="coerce") valid = ~s.isna() return s, valid def coerce_date(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]: """Return (parsed_datetime, is_valid_date) with NaT for invalid entries.""" s = pd.to_datetime(df[col], errors="coerce", utc=False, infer_datetime_format=True) valid = ~s.isna() return s, valid def validate_campaign_report( df: pd.DataFrame, currency_col: Optional[str] = None, expected_currency: Optional[str] = None ) -> Dict: errors: List[Dict] = [] # Normalize column names by stripping whitespace df = df.copy() df.columns = [c.strip() for c in df.columns] # Check presence of required columns missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns] if missing_cols: return { "fatal": True, "message": f"Missing required columns: {missing_cols}", "errors": pd.DataFrame(columns=["row_index", "campaign_id", "field", "code", "message"]), "summary": {} } n_rows = len(df) # Validate campaign_id non-null and non-empty cid_null = df["campaign_id"].isna() | (df["campaign_id"].astype(str).str.strip() == "") for idx in df.index[cid_null]: errors.append({ "row_index": idx, "campaign_id": None if pd.isna(df.loc[idx, "campaign_id"]) else str(df.loc[idx, "campaign_id"]), "field": "campaign_id", "code": "missing", "message": "campaign_id is null or empty" }) # Validate channel enumeration and missing ch_null = df["channel"].isna() | (df["channel"].astype(str).str.strip() == "") for idx in df.index[ch_null]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "channel", "code": "missing", "message": "channel is null or empty" }) # allowed values ch_str = df["channel"].astype(str).str.strip().str.lower() ch_invalid = ~ch_str.isin(ALLOWED_CHANNELS) # Skip invalid flag for rows already missing ch_invalid = ch_invalid & ~ch_null for idx in df.index[ch_invalid]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "channel", "code": "enum_violation", "message": f"channel '{df.loc[idx, 'channel']}' not in {sorted(ALLOWED_CHANNELS)}" }) # Numeric columns: impression, click, conversion, cost numeric_cols = ["impression", "click", "conversion", "cost"] parsed_numeric = {} valid_numeric = {} for col in numeric_cols: s_parsed, s_valid = coerce_numeric(df, col) parsed_numeric[col] = s_parsed valid_numeric[col] = s_valid # Missing or non-numeric invalid_mask = ~s_valid for idx in df.index[invalid_mask]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": col, "code": "type_error", "message": f"{col} is not numeric" }) # Non-negative constraint nonneg_mask = s_parsed < 0 for idx in df.index[nonneg_mask & s_valid]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": col, "code": "range_violation", "message": f"{col} must be >= 0" }) # Integer constraint for count columns if col in {"impression", "click", "conversion"}: # Allow NaN failures already recorded; check integer-ness for valid values int_mask = s_parsed.dropna().apply(lambda x: float(x).is_integer()) int_mask = int_mask.reindex(df.index, fill_value=False) non_integer = ~int_mask & s_valid for idx in df.index[non_integer]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": col, "code": "type_error", "message": f"{col} must be an integer count" }) # Funnel consistency: conversion <= click <= impression conv = parsed_numeric["conversion"] clk = parsed_numeric["click"] imp = parsed_numeric["impression"] # Only evaluate on rows where all three are valid and non-negative integers funnel_valid = valid_numeric["conversion"] & valid_numeric["click"] & valid_numeric["impression"] # Apply comparison funnel_violation = funnel_valid & ~((conv <= clk) & (clk <= imp)) for idx in df.index[funnel_violation]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "funnel", "code": "consistency_error", "message": f"conversion({conv.loc[idx]}) <= click({clk.loc[idx]}) <= impression({imp.loc[idx]}) violated" }) # Date columns start_parsed, start_valid = coerce_date(df, "start_date") end_parsed, end_valid = coerce_date(df, "end_date") for idx in df.index[~start_valid]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "start_date", "code": "type_error", "message": "start_date is invalid or cannot be parsed" }) for idx in df.index[~end_valid]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "end_date", "code": "type_error", "message": "end_date is invalid or cannot be parsed" }) # Date interval: start_date <= end_date date_interval_valid = start_valid & end_valid interval_violation = date_interval_valid & (start_parsed > end_parsed) for idx in df.index[interval_violation]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": "date_range", "code": "range_violation", "message": f"start_date({start_parsed.loc[idx].date()}) > end_date({end_parsed.loc[idx].date()})" }) # Currency consistency (optional) currency_check_status = "skipped" currency_unique_values = None if currency_col is None: # Try to auto-detect common currency column names for candidate in ["currency", "cost_currency"]: if candidate in df.columns: currency_col = candidate break if currency_col is not None and currency_col in df.columns: currency_check_status = "performed" cur_series = df[currency_col].astype(str).str.strip() cur_missing = cur_series.isna() | (cur_series == "") | (cur_series.str.lower() == "nan") for idx in df.index[cur_missing]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": currency_col, "code": "missing", "message": f"{currency_col} is null or empty" }) currency_unique_values = sorted(cur_series[~cur_missing].unique().tolist()) if expected_currency is not None: mism = cur_series[~cur_missing] != expected_currency for idx in df.index[mism]: errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": currency_col, "code": "consistency_error", "message": f"Currency '{cur_series.loc[idx]}' != expected '{expected_currency}'" }) else: # Require single currency across dataset if len(set(cur_series[~cur_missing])) > 1: # Report a dataset-level inconsistency with per-row notes for idx in df.index[~cur_missing]: # Only flag rows that contribute to inconsistency (i.e., not the modal value) # For simplicity, flag all non-missing rows when multiple currencies exist errors.append({ "row_index": idx, "campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None, "field": currency_col, "code": "consistency_error", "message": f"Multiple currencies present in dataset: {currency_unique_values}" }) else: currency_check_status = "skipped_no_column" # Build errors DataFrame errors_df = pd.DataFrame(errors, columns=["row_index", "campaign_id", "field", "code", "message"]) errors_df.sort_values(by=["row_index", "field"], inplace=True) # Summary metrics summary = { "row_count": int(n_rows), "error_count": int(len(errors_df)), "missing_campaign_id": int(((df["campaign_id"].isna()) | (df["campaign_id"].astype(str).str.strip() == "")).sum()), "invalid_channel": int(ch_invalid.sum()), "missing_channel": int(ch_null.sum()), "invalid_impression": int((~valid_numeric["impression"]).sum()), "invalid_click": int((~valid_numeric["click"]).sum()), "invalid_conversion": int((~valid_numeric["conversion"]).sum()), "invalid_cost": int((~valid_numeric["cost"]).sum()), "negative_impression": int((parsed_numeric["impression"] < 0).sum()), "negative_click": int((parsed_numeric["click"] < 0).sum()), "negative_conversion": int((parsed_numeric["conversion"] < 0).sum()), "negative_cost": int((parsed_numeric["cost"] < 0).sum()), "funnel_inconsistency": int(funnel_violation.sum()), "invalid_start_date": int((~start_valid).sum()), "invalid_end_date": int((~end_valid).sum()), "date_range_violation": int(interval_violation.sum()), "currency_check_status": currency_check_status, "currency_unique_values": currency_unique_values if currency_unique_values is not None else [] } return { "fatal": False, "message": "Validation completed", "errors": errors_df, "summary": summary } def main(): parser = argparse.ArgumentParser(description="Validate campaign report dataset integrity.") parser.add_argument("--input", required=True, help="Path to input CSV file") parser.add_argument("--output", required=False, help="Path to output CSV file for row-level errors") parser.add_argument("--currency-col", required=False, help="Currency column name (e.g., 'currency')") parser.add_argument("--expected-currency", required=False, help="Expected currency code (e.g., 'USD')") parser.add_argument("--delimiter", required=False, default=",", help="CSV delimiter (default ',')") parser.add_argument("--encoding", required=False, default="utf-8", help="File encoding (default 'utf-8')") args = parser.parse_args() try: df = pd.read_csv(args.input, delimiter=args.delimiter, encoding=args.encoding) except Exception as e: print(f"Failed to read input file: {e}", file=sys.stderr) sys.exit(2) result = validate_campaign_report(df, currency_col=args.currency_col, expected_currency=args.expected_currency) if result.get("fatal", False): print(f"Fatal error: {result.get('message')}", file=sys.stderr) sys.exit(1) # Print summary summary = result["summary"] print("Validation Summary") print(f"Rows: {summary['row_count']}") print(f"Total errors: {summary['error_count']}") print(f"Missing campaign_id: {summary['missing_campaign_id']}") print(f"Missing channel: {summary['missing_channel']}") print(f"Invalid channel enum: {summary['invalid_channel']}") print(f"Invalid impression: {summary['invalid_impression']} | Negative impression: {summary['negative_impression']}") print(f"Invalid click: {summary['invalid_click']} | Negative click: {summary['negative_click']}") print(f"Invalid conversion: {summary['invalid_conversion']} | Negative conversion: {summary['negative_conversion']}") print(f"Invalid cost: {summary['invalid_cost']} | Negative cost: {summary['negative_cost']}") print(f"Funnel inconsistencies: {summary['funnel_inconsistency']}") print(f"Invalid start_date: {summary['invalid_start_date']}") print(f"Invalid end_date: {summary['invalid_end_date']}") print(f"Date range violations: {summary['date_range_violation']}") print(f"Currency check: {summary['currency_check_status']}") if summary["currency_unique_values"]: print(f"Currencies detected: {summary['currency_unique_values']}") # Write per-row errors if requested if args.output: try: result["errors"].to_csv(args.output, index=False) print(f"Row-level errors written to {args.output}") except Exception as e: print(f"Failed to write output errors CSV: {e}", file=sys.stderr) sys.exit(3) if __name__ == "__main__": main() ```
在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。
在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。
活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。
训练前检查样本一致性、标签异常与分布偏差,输出处理建议,提升模型稳定与上线成功率,避免数据偏差影响效果。
自动生成可追溯的检查报告与记录,满足合规审查要求,快速定位问题环节,降低数据风险暴露与审计成本。
建立标准化校验清单并批量执行,持续监测健康度,推动各域数据达标与改进闭环,形成数据质量治理机制。
把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期