热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
生成用于验证数据完整性的Python脚本,精准高效。
下面给出一份可直接运行的 Python 脚本,使用 pandas 对“订单表 + 用户表”进行数据完整性验证,并输出问题明细与汇总。覆盖的检查项包括:主键/外键一致性、金额范围、渠道枚举、日期合法与时区归一、缺失与重复值等。
说明
脚本
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import sys
from datetime import date, datetime, timedelta
import numpy as np
import pandas as pd
import pytz
############################
# 配置
############################
ORDERS_CSV = "orders.csv" # 订单表CSV路径,要求包含列:order_id, order_date, customer_id, amount, channel, currency
USERS_CSV = "users.csv" # 用户表CSV路径,要求包含列:customer_id, signup_date, region
OUTPUT_DIR = "output" # 问题明细输出目录
TARGET_TZ = "Asia/Shanghai" # 日期时间归一的目标时区
ALLOWED_CHANNELS = {"web", "app"} # 渠道枚举
# 日期合理范围(可按需调整)
DATE_MIN = pd.to_datetime("1970-01-01").date()
DATE_MAX = (datetime.now(pytz.timezone(TARGET_TZ)) + timedelta(days=365)).date() # 未来一年的容忍
############################
# 工具函数
############################
def ensure_output_dir(path: str):
os.makedirs(path, exist_ok=True)
def read_csv_as_strings(csv_path: str, required_cols: set):
if not os.path.exists(csv_path):
raise FileNotFoundError(f"未找到文件: {csv_path}")
# 统一以字符串读取,避免前导零丢失/科学计数等问题;后续再做严格类型校验
df = pd.read_csv(csv_path, dtype=str, keep_default_na=False, na_values=["", "NA", "NaN", "null", "NULL"])
# 统一列名小写
df.columns = [c.strip().lower() for c in df.columns]
missing = required_cols - set(df.columns)
if missing:
raise ValueError(f"文件 {csv_path} 缺少必需列: {missing}")
# 去除字符串首尾空白
for c in df.columns:
if pd.api.types.is_string_dtype(df[c]):
df[c] = df[c].str.strip()
return df
def to_nullable_int(series: pd.Series):
# 允许空值;验证为整数(禁止小数)
s = series.copy()
# 去掉逗号/空格
s = s.str.replace(",", "", regex=False)
# 合法整数(可含符号),用正则先行筛查,避免 1.0 之类误判
mask_nonempty = s.notna() & (s != "")
mask_intlike = s.str.match(r"^[+-]?\d+$", na=False)
out = pd.Series(pd.NA, index=s.index, dtype="Int64")
out.loc[mask_nonempty & mask_intlike] = pd.to_numeric(s[mask_nonempty & mask_intlike], errors="coerce").astype("Int64")
return out
def to_nullable_decimal(series: pd.Series):
s = series.copy()
s = s.str.replace(",", "", regex=False)
# 允许小数;返回float64并保留NaN
out = pd.to_numeric(s, errors="coerce")
return out
def normalize_channel(series: pd.Series):
s = series.str.lower().str.strip()
return s
def parse_to_local_date(series: pd.Series, target_tz: str):
"""
规则:
- 纯日期(YYYY-MM-DD)直接解析为日期;
- 含时间戳:
* 含时区偏移/标记:转为UTC再转换到目标时区,最后取date;
* 无时区:按目标时区本地化,再取date。
返回:
- dates: pd.Series of python date (或 NaT),dtype=object
- meta: dict,包含计数统计
"""
tz = pytz.timezone(target_tz)
raw = series.astype("string").str.strip()
mask_empty = raw.isna() | (raw == "")
mask_date_only = raw.str.match(r"^\d{4}-\d{2}-\d{2}$", na=False)
mask_has_tz = raw.str.contains(r"(Z|[+-]\d{2}:\d{2}|[+-]\d{4}|UTC|GMT)", case=False, na=False)
mask_timestamp = ~mask_empty & ~mask_date_only
dates = pd.Series(pd.NaT, index=raw.index, dtype="object") # 保存为 python date 或 NaT
# 纯日期
if mask_date_only.any():
d = pd.to_datetime(raw[mask_date_only], format="%Y-%m-%d", errors="coerce")
dates.loc[mask_date_only] = d.dt.date
# 含时区的时间戳
mask_ts_tz = mask_timestamp & mask_has_tz
if mask_ts_tz.any():
dt_utc = pd.to_datetime(raw[mask_ts_tz], errors="coerce", utc=True)
dt_local = dt_utc.dt.tz_convert(tz)
dates.loc[mask_ts_tz] = dt_local.dt.date
# 无时区的时间戳 -> 本地化到目标时区
mask_ts_naive = mask_timestamp & (~mask_has_tz)
if mask_ts_naive.any():
dt_naive = pd.to_datetime(raw[mask_ts_naive], errors="coerce", utc=False)
try:
dt_localized = dt_naive.dt.tz_localize(tz)
except TypeError:
# 若 dt_naive 已带tz(极端混合情况),退化处理
dt_localized = dt_naive
dt_localized = dt_localized.dt.tz_convert(tz)
dates.loc[mask_ts_naive] = dt_localized.dt.date
meta = {
"empty": int(mask_empty.sum()),
"date_only": int(mask_date_only.sum()),
"timestamp_with_tz": int(mask_ts_tz.sum()),
"timestamp_naive": int(mask_ts_naive.sum()),
"parsed_na": int(pd.isna(dates).sum())
}
return dates, meta
def regex_iso_currency(series: pd.Series):
# 简单校验货币代码格式(ISO-4217 形态):3位大写字母
s = series.astype("string")
return s.str.fullmatch(r"[A-Z]{3}", na=False)
############################
# 验证逻辑
############################
def validate_orders(df_orders: pd.DataFrame):
issues = {}
# 1) 主键与必填约束
df = df_orders.copy()
df["order_id_int"] = to_nullable_int(df["order_id"])
issues["order_id_null"] = df[df["order_id_int"].isna()][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]
# 主键重复
dup_order = df["order_id_int"].duplicated(keep=False) & df["order_id_int"].notna()
issues["order_pk_duplicates"] = df[dup_order][["order_id", "customer_id", "order_date", "amount", "channel", "currency"]]
# 2) 非空约束:customer_id
issues["order_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["order_id", "customer_id"]]
# 3) 金额 decimal >= 0
df["amount_num"] = to_nullable_decimal(df["amount"])
issues["amount_not_numeric"] = df[df["amount"].notna() & (df["amount"] != "") & df["amount_num"].isna()][["order_id", "amount"]]
issues["amount_negative"] = df[df["amount_num"].notna() & (df["amount_num"] < 0)][["order_id", "amount", "amount_num"]]
# 4) 渠道枚举
df["channel_norm"] = normalize_channel(df["channel"])
issues["channel_null"] = df[df["channel_norm"].isna() | (df["channel_norm"] == "")][["order_id", "channel"]]
issues["channel_invalid_enum"] = df[df["channel_norm"].notna() & (df["channel_norm"] != "") & (~df["channel_norm"].isin(ALLOWED_CHANNELS))][["order_id", "channel"]]
# 5) 货币默认/格式(默认CNY:若为空,视为未正确应用默认;非空不限定取值)
issues["currency_missing_should_default"] = df[df["currency"].isna() | (df["currency"] == "")][["order_id", "currency"]]
# 可选:检查格式是否符合3位大写(信息级)
fmt_mask = regex_iso_currency(df["currency"])
issues["currency_format_non_iso_like_info"] = df[(df["currency"].notna()) & (df["currency"] != "") & (~fmt_mask)][["order_id", "currency"]]
# 6) 日期合法与时区归一
order_dates, meta_order = parse_to_local_date(df["order_date"], TARGET_TZ)
df["order_date_norm"] = order_dates
issues["order_date_unparseable"] = df[pd.isna(df["order_date_norm"]) & df["order_date"].notna() & (df["order_date"] != "")][["order_id", "order_date"]]
# 日期范围
mask_order_range_invalid = (~pd.isna(df["order_date_norm"])) & ((df["order_date_norm"] < DATE_MIN) | (df["order_date_norm"] > DATE_MAX))
issues["order_date_out_of_range"] = df[mask_order_range_invalid][["order_id", "order_date", "order_date_norm"]]
# 7) 订单表全行重复(可能来自重复导出)
issues["order_fullrow_duplicates"] = df[df.duplicated(subset=["order_id", "order_date", "customer_id", "amount", "channel", "currency"], keep=False)]
return df, issues, {"order_date_meta": meta_order}
def validate_users(df_users: pd.DataFrame):
issues = {}
df = df_users.copy()
# 1) 主键与必填约束
issues["user_customer_id_null"] = df[df["customer_id"].isna() | (df["customer_id"] == "")][["customer_id", "signup_date", "region"]]
dup_user = df["customer_id"].duplicated(keep=False) & df["customer_id"].notna() & (df["customer_id"] != "")
issues["user_pk_duplicates"] = df[dup_user][["customer_id", "signup_date", "region"]]
# 2) 日期合法与时区归一
signup_dates, meta_user = parse_to_local_date(df["signup_date"], TARGET_TZ)
df["signup_date_norm"] = signup_dates
issues["signup_date_unparseable"] = df[pd.isna(df["signup_date_norm"]) & df["signup_date"].notna() & (df["signup_date"] != "")][["customer_id", "signup_date"]]
mask_signup_range_invalid = (~pd.isna(df["signup_date_norm"])) & ((df["signup_date_norm"] < DATE_MIN) | (df["signup_date_norm"] > DATE_MAX))
issues["signup_date_out_of_range"] = df[mask_signup_range_invalid][["customer_id", "signup_date", "signup_date_norm"]]
return df, issues, {"signup_date_meta": meta_user}
def check_foreign_keys(df_orders_norm: pd.DataFrame, df_users_norm: pd.DataFrame):
# 左连接检出订单孤儿(外键不匹配)
merged = df_orders_norm.merge(
df_users_norm[["customer_id", "signup_date_norm", "region"]],
on="customer_id",
how="left",
indicator=True,
suffixes=("", "_user")
)
orphan = merged[merged["_merge"] == "left_only"]
# 可选的业务一致性:下单日期 >= 注册日期(若两者都有效)
has_both = merged["order_date_norm"].notna() & merged["signup_date_norm"].notna()
violates_chronology = merged[has_both & (merged["order_date_norm"] < merged["signup_date_norm"])]
fk_issues = {
"fk_orphan_orders": orphan[["order_id", "customer_id", "order_date_norm", "signup_date_norm", "region"]],
"order_before_signup": violates_chronology[["order_id", "customer_id", "order_date_norm", "signup_date_norm"]]
}
return merged, fk_issues
def save_issues(issues: dict, prefix: str):
ensure_output_dir(OUTPUT_DIR)
for name, df in issues.items():
if df is None or len(df) == 0:
continue
out_path = os.path.join(OUTPUT_DIR, f"{prefix}__{name}.csv")
df.to_csv(out_path, index=False, encoding="utf-8-sig")
def print_summary(total_orders: int, total_users: int, issues_all: dict, metas: dict):
print("=== 数据完整性验证汇总 ===")
print(f"- 订单表记录数: {total_orders}")
print(f"- 用户表记录数: {total_users}")
# 日期解析元信息
if "order_date_meta" in metas:
m = metas["order_date_meta"]
print(f"- 订单日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")
if "signup_date_meta" in metas:
m = metas["signup_date_meta"]
print(f"- 注册日期解析: date_only={m['date_only']}, ts_with_tz={m['timestamp_with_tz']}, ts_naive={m['timestamp_naive']}, empty={m['empty']}, parsed_na={m['parsed_na']}")
# 统计各类问题计数
print("\n=== 问题计数 ===")
for section, sec_issues in issues_all.items():
for name, df in sec_issues.items():
cnt = 0 if df is None else len(df)
print(f"[{section}] {name}: {cnt}")
print(f"\n明细已输出到目录: {OUTPUT_DIR}\n")
def main():
try:
# 读取
orders_req = {"order_id", "order_date", "customer_id", "amount", "channel", "currency"}
users_req = {"customer_id", "signup_date", "region"}
df_orders_raw = read_csv_as_strings(ORDERS_CSV, orders_req)
df_users_raw = read_csv_as_strings(USERS_CSV, users_req)
# 验证各自表
df_orders_norm, order_issues, order_meta = validate_orders(df_orders_raw)
df_users_norm, user_issues, user_meta = validate_users(df_users_raw)
# 外键与合并
merged, fk_issues = check_foreign_keys(df_orders_norm, df_users_norm)
# 汇总与输出
save_issues(order_issues, "orders")
save_issues(user_issues, "users")
save_issues(fk_issues, "fk")
# 输出已归一化(合并后)样例(可选)
sample_out = os.path.join(OUTPUT_DIR, "merged_sample.csv")
merged.head(1000).to_csv(sample_out, index=False, encoding="utf-8-sig")
# 汇总打印
issues_all = {"orders": order_issues, "users": user_issues, "merge_fk": fk_issues}
metas = {"order_date_meta": order_meta["order_date_meta"], "signup_date_meta": user_meta["signup_date_meta"]}
print_summary(len(df_orders_raw), len(df_users_raw), issues_all, metas)
# 以退出码标识是否存在“严重问题”
severe_counts = 0
severe_counts += len(order_issues["order_id_null"])
severe_counts += len(order_issues["order_pk_duplicates"])
severe_counts += len(order_issues["order_customer_id_null"])
severe_counts += len(order_issues["amount_negative"])
severe_counts += len(order_issues["channel_null"]) + len(order_issues["channel_invalid_enum"])
severe_counts += len(order_issues["order_date_unparseable"])
severe_counts += len(user_issues["user_customer_id_null"])
severe_counts += len(user_issues["user_pk_duplicates"])
severe_counts += len(user_issues["signup_date_unparseable"])
severe_counts += len(fk_issues["fk_orphan_orders"])
if severe_counts > 0:
sys.exit(2)
else:
sys.exit(0)
except Exception as e:
print(f"运行失败: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == "__main__":
main()
使用说明
校验要点与解释
Below is a self-contained PySpark script that validates the described dataset for deduplication, partition completeness, time-window bounds, enum legality, JSON structure, and required non-null fields. It produces two outputs: a “clean” deduplicated dataset and an “invalid” dataset with violation reasons. It also optionally fails if expected partitions are missing.
Assumptions:
You can adjust configuration via command-line arguments.
Python script (PySpark):
#!/usr/bin/env python3
import argparse
import sys
from datetime import datetime, date, timedelta
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import (
col, lower, trim, to_date, date_add, date_sub, expr, when, lit,
array, array_union, array_except, size, from_json, map_keys, element_at,
greatest, least, coalesce
)
from pyspark.sql.types import StringType, MapType
def parse_args():
p = argparse.ArgumentParser(description="ETL integrity validation for event dataset")
p.add_argument("--input-path", required=True, help="Input dataset path (Parquet), partitioned by dt")
p.add_argument("--output-valid-path", required=True, help="Output path for valid, deduplicated dataset")
p.add_argument("--output-invalid-path", required=True, help="Output path for invalid records with reasons")
p.add_argument("--start-dt", required=True, help="Expected partition range start (YYYY-MM-DD)")
p.add_argument("--end-dt", required=True, help="Expected partition range end (YYYY-MM-DD)")
p.add_argument("--allowed-event-types", default="view,click,purchase", help="Comma-separated allowed event types")
p.add_argument("--late-lag-days", type=int, default=1, help="Max allowed lateness (days) wrt dt")
p.add_argument("--future-drift-days", type=int, default=0, help="Max allowed future drift (days) wrt dt")
p.add_argument("--required-json-keys", default="", help="Comma-separated keys required within properties JSON (if properties is not null)")
p.add_argument("--fail-on-partition-missing", action="store_true", help="Fail job if any expected dt partitions are missing")
p.add_argument("--repartition", type=int, default=0, help="Optional number of partitions for output")
return p.parse_args()
def daterange(start_dt: date, end_dt: date):
d = start_dt
out = []
while d <= end_dt:
out.append(d)
d += timedelta(days=1)
return out
def main():
args = parse_args()
start_dt = datetime.strptime(args.start_dt, "%Y-%m-%d").date()
end_dt = datetime.strptime(args.end_dt, "%Y-%m-%d").date()
if start_dt > end_dt:
print("Error: start-dt must be <= end-dt", file=sys.stderr)
sys.exit(2)
allowed_event_types = [s.strip().lower() for s in args.allowed_event_types.split(",") if s.strip()]
required_json_keys = [s.strip() for s in args.required_json_keys.split(",") if s.strip()]
spark = (
SparkSession.builder
.appName("event-integrity-validation")
.getOrCreate()
)
# Read input dataset
df = spark.read.parquet(args.input_path)
# Normalize types and values
# - ensure event_type lowercase for enum validation
# - ensure dt is date, event_time is timestamp
df = df \
.withColumn("event_type_norm", lower(trim(col("event_type")))) \
.withColumn("dt", to_date(col("dt"))) \
.withColumn("event_time", col("event_time").cast("timestamp")) \
.withColumn("event_id", col("event_id").cast(StringType())) \
.withColumn("user_id", col("user_id").cast(StringType())) \
.withColumn("properties", col("properties").cast(StringType()))
# Deduplicate by primary key event_id:
# Keep the row with the latest event_time; in tie, keep the lexicographically max JSON to stabilize.
# Mark dropped duplicates for invalid reporting.
w = Window.partitionBy("event_id").orderBy(
col("event_time").desc_nulls_last(),
col("properties").desc_nulls_last()
)
df_with_rownum = df.withColumn("rn", expr("row_number() OVER (PARTITION BY event_id ORDER BY event_time DESC NULLS LAST, properties DESC NULLS LAST)"))
canonical = df_with_rownum.filter(col("rn") == 1).drop("rn")
duplicates_removed = df_with_rownum.filter(col("rn") > 1).drop("rn") \
.withColumn("violations", array(lit("duplicate_removed")))
# Core validations on canonical set
# Required non-null fields
required_viol = array()
required_viol = when(col("event_id").isNull(), array_union(required_viol, array(lit("missing_required:event_id")))).otherwise(required_viol)
required_viol = when(col("event_time").isNull(), array_union(required_viol, array(lit("missing_required:event_time")))).otherwise(required_viol)
required_viol = when(col("dt").isNull(), array_union(required_viol, array(lit("missing_required:dt")))).otherwise(required_viol)
# Enum validation (event_type cannot be null and must be in allowed set)
enum_valid = col("event_type_norm").isin(allowed_event_types)
enum_viol = when(col("event_type_norm").isNull(), array(lit("invalid_enum:null"))) \
.otherwise(when(enum_valid, array()).otherwise(array(lit("invalid_enum:event_type"))))
# Time window validation:
# Allowed event_time date must be within [dt - late_lag_days, dt + future_drift_days]
event_date = to_date(col("event_time"))
min_allowed_date = date_sub(col("dt"), args.late_lag_days)
max_allowed_date = date_add(col("dt"), args.future_drift_days)
time_viol = when(
col("event_time").isNull(), array(lit("time_window:event_time_null"))
).otherwise(
when((event_date < min_allowed_date) | (event_date > max_allowed_date),
array(lit("time_window:out_of_range"))).otherwise(array())
)
# JSON validation: properties is nullable; if present must be a valid JSON object.
# We parse as Map<String,String> for structural validation. If parse fails => null.
json_schema = MapType(StringType(), StringType(), containsNull=True)
parsed_props = from_json(trim(col("properties")), json_schema)
json_viol = when(
col("properties").isNull(), array()
).otherwise(
when(parsed_props.isNull(), array(lit("invalid_json:parse_error"))).otherwise(array())
)
# Required JSON keys (if provided): keys must exist and be non-empty when properties is not null
if required_json_keys:
# For each required key, check existence and non-empty value
for k in required_json_keys:
json_viol = when(
col("properties").isNull(), json_viol # no requirement if properties is null
).otherwise(
when(
(parsed_props.isNull()) | (element_at(parsed_props, lit(k)).isNull()) | (trim(element_at(parsed_props, lit(k))) == ""),
array_union(json_viol, array(lit(f"missing_json_key:{k}")))
).otherwise(json_viol)
)
# Aggregate violations for canonical records
violations = array()
violations = array_union(violations, required_viol)
violations = array_union(violations, enum_viol)
violations = array_union(violations, time_viol)
violations = array_union(violations, json_viol)
canonical_with_viol = canonical.withColumn("violations", violations)
# Split valid vs invalid
invalid_canonical = canonical_with_viol.filter(size(col("violations")) > 0)
valid_canonical = canonical_with_viol.filter(size(col("violations")) == 0).drop("violations")
# Combine invalid sets (duplicates + invalid canonical)
invalid_all = invalid_canonical.unionByName(duplicates_removed, allowMissingColumns=True)
# Partition completeness check
expected_dates = [d.strftime("%Y-%m-%d") for d in daterange(start_dt, end_dt)]
actual_dates = [r["dt"].strftime("%Y-%m-%d") for r in df.select("dt").distinct().collect() if r["dt"] is not None]
expected_set = set(expected_dates)
actual_set = set(actual_dates)
missing_partitions = sorted(expected_set - actual_set)
# Write outputs
if args.repartition > 0:
valid_canonical = valid_canonical.repartition(args.repartition, col("dt"))
invalid_all = invalid_all.repartition(args.repartition)
# Write valid dataset partitioned by dt to preserve downstream partitioning
valid_canonical.write.mode("overwrite").partitionBy("dt").parquet(args.output_valid_path)
invalid_all.write.mode("overwrite").parquet(args.output_invalid_path)
# Metrics and summary
total_in = df.count()
total_valid = valid_canonical.count()
total_invalid = invalid_all.count()
total_duplicates = duplicates_removed.count()
print("Validation Summary")
print(f"- Input rows: {total_in}")
print(f"- Valid (deduplicated) rows: {total_valid}")
print(f"- Invalid rows: {total_invalid}")
print(f"- Duplicates removed: {total_duplicates}")
print(f"- Expected dt partitions: {len(expected_set)} [{args.start_dt}..{args.end_dt}]")
print(f"- Actual dt partitions: {len(actual_set)}")
if missing_partitions:
print(f"- Missing dt partitions ({len(missing_partitions)}): {', '.join(missing_partitions)}")
if args.fail_on_partition_missing:
print("Failing due to missing partitions.", file=sys.stderr)
spark.stop()
sys.exit(1)
else:
print("- Partition completeness: OK")
spark.stop()
if __name__ == "__main__":
main()
Usage example:
python validate_events.py \
--input-path s3://bucket/events_parquet/ \
--output-valid-path s3://bucket/events_valid/ \
--output-invalid-path s3://bucket/events_invalid/ \
--start-dt 2025-09-20 \
--end-dt 2025-09-26 \
--allowed-event-types view,click,purchase \
--late-lag-days 1 \
--future-drift-days 0 \
--required-json-keys item_id,price \
--fail-on-partition-missing \
--repartition 200
Notes:
Below is a self-contained Python script that validates the integrity of a campaign reporting dataset. It checks schema conformance, funnel consistency (conversion <= click <= impression), date intervals, missing and anomalous values, channel enumeration, and currency consistency (if a currency column is present or specified).
You can save it as validate_campaign_report.py and run: python validate_campaign_report.py --input path/to/data.csv --output path/to/validation_report.csv --currency-col currency --expected-currency USD
If no currency column exists or is not specified, the currency check is skipped with a warning.
#!/usr/bin/env python3
"""
Validate campaign reporting dataset integrity.
Required columns:
- campaign_id: string (non-null)
- channel: enum in {"email", "ad", "app"}
- impression: int >= 0
- click: int >= 0
- conversion: int >= 0
- cost: decimal >= 0
- start_date: date
- end_date: date
Checks:
- Missing values in required fields
- Type and range: non-negative, integer for counts
- Funnel consistency: conversion <= click <= impression
- Date parsing and interval: start_date <= end_date
- Channel enumeration correctness
- Currency consistency (if currency column provided or detected):
- Non-null currency values
- Single unique currency or match expected currency
Outputs a CSV of row-level errors and prints a summary to stdout.
"""
import argparse
import sys
import pandas as pd
import numpy as np
from typing import Dict, List, Optional, Tuple
REQUIRED_COLUMNS = [
"campaign_id", "channel", "impression", "click",
"conversion", "cost", "start_date", "end_date"
]
ALLOWED_CHANNELS = {"email", "ad", "app"}
def coerce_numeric(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
"""Return (parsed_numeric, is_valid_numeric) with NaN for invalid entries."""
s = pd.to_numeric(df[col], errors="coerce")
valid = ~s.isna()
return s, valid
def coerce_date(df: pd.DataFrame, col: str) -> Tuple[pd.Series, pd.Series]:
"""Return (parsed_datetime, is_valid_date) with NaT for invalid entries."""
s = pd.to_datetime(df[col], errors="coerce", utc=False, infer_datetime_format=True)
valid = ~s.isna()
return s, valid
def validate_campaign_report(
df: pd.DataFrame,
currency_col: Optional[str] = None,
expected_currency: Optional[str] = None
) -> Dict:
errors: List[Dict] = []
# Normalize column names by stripping whitespace
df = df.copy()
df.columns = [c.strip() for c in df.columns]
# Check presence of required columns
missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
if missing_cols:
return {
"fatal": True,
"message": f"Missing required columns: {missing_cols}",
"errors": pd.DataFrame(columns=["row_index", "campaign_id", "field", "code", "message"]),
"summary": {}
}
n_rows = len(df)
# Validate campaign_id non-null and non-empty
cid_null = df["campaign_id"].isna() | (df["campaign_id"].astype(str).str.strip() == "")
for idx in df.index[cid_null]:
errors.append({
"row_index": idx,
"campaign_id": None if pd.isna(df.loc[idx, "campaign_id"]) else str(df.loc[idx, "campaign_id"]),
"field": "campaign_id",
"code": "missing",
"message": "campaign_id is null or empty"
})
# Validate channel enumeration and missing
ch_null = df["channel"].isna() | (df["channel"].astype(str).str.strip() == "")
for idx in df.index[ch_null]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "channel",
"code": "missing",
"message": "channel is null or empty"
})
# allowed values
ch_str = df["channel"].astype(str).str.strip().str.lower()
ch_invalid = ~ch_str.isin(ALLOWED_CHANNELS)
# Skip invalid flag for rows already missing
ch_invalid = ch_invalid & ~ch_null
for idx in df.index[ch_invalid]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "channel",
"code": "enum_violation",
"message": f"channel '{df.loc[idx, 'channel']}' not in {sorted(ALLOWED_CHANNELS)}"
})
# Numeric columns: impression, click, conversion, cost
numeric_cols = ["impression", "click", "conversion", "cost"]
parsed_numeric = {}
valid_numeric = {}
for col in numeric_cols:
s_parsed, s_valid = coerce_numeric(df, col)
parsed_numeric[col] = s_parsed
valid_numeric[col] = s_valid
# Missing or non-numeric
invalid_mask = ~s_valid
for idx in df.index[invalid_mask]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": col,
"code": "type_error",
"message": f"{col} is not numeric"
})
# Non-negative constraint
nonneg_mask = s_parsed < 0
for idx in df.index[nonneg_mask & s_valid]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": col,
"code": "range_violation",
"message": f"{col} must be >= 0"
})
# Integer constraint for count columns
if col in {"impression", "click", "conversion"}:
# Allow NaN failures already recorded; check integer-ness for valid values
int_mask = s_parsed.dropna().apply(lambda x: float(x).is_integer())
int_mask = int_mask.reindex(df.index, fill_value=False)
non_integer = ~int_mask & s_valid
for idx in df.index[non_integer]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": col,
"code": "type_error",
"message": f"{col} must be an integer count"
})
# Funnel consistency: conversion <= click <= impression
conv = parsed_numeric["conversion"]
clk = parsed_numeric["click"]
imp = parsed_numeric["impression"]
# Only evaluate on rows where all three are valid and non-negative integers
funnel_valid = valid_numeric["conversion"] & valid_numeric["click"] & valid_numeric["impression"]
# Apply comparison
funnel_violation = funnel_valid & ~((conv <= clk) & (clk <= imp))
for idx in df.index[funnel_violation]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "funnel",
"code": "consistency_error",
"message": f"conversion({conv.loc[idx]}) <= click({clk.loc[idx]}) <= impression({imp.loc[idx]}) violated"
})
# Date columns
start_parsed, start_valid = coerce_date(df, "start_date")
end_parsed, end_valid = coerce_date(df, "end_date")
for idx in df.index[~start_valid]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "start_date",
"code": "type_error",
"message": "start_date is invalid or cannot be parsed"
})
for idx in df.index[~end_valid]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "end_date",
"code": "type_error",
"message": "end_date is invalid or cannot be parsed"
})
# Date interval: start_date <= end_date
date_interval_valid = start_valid & end_valid
interval_violation = date_interval_valid & (start_parsed > end_parsed)
for idx in df.index[interval_violation]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": "date_range",
"code": "range_violation",
"message": f"start_date({start_parsed.loc[idx].date()}) > end_date({end_parsed.loc[idx].date()})"
})
# Currency consistency (optional)
currency_check_status = "skipped"
currency_unique_values = None
if currency_col is None:
# Try to auto-detect common currency column names
for candidate in ["currency", "cost_currency"]:
if candidate in df.columns:
currency_col = candidate
break
if currency_col is not None and currency_col in df.columns:
currency_check_status = "performed"
cur_series = df[currency_col].astype(str).str.strip()
cur_missing = cur_series.isna() | (cur_series == "") | (cur_series.str.lower() == "nan")
for idx in df.index[cur_missing]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": currency_col,
"code": "missing",
"message": f"{currency_col} is null or empty"
})
currency_unique_values = sorted(cur_series[~cur_missing].unique().tolist())
if expected_currency is not None:
mism = cur_series[~cur_missing] != expected_currency
for idx in df.index[mism]:
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": currency_col,
"code": "consistency_error",
"message": f"Currency '{cur_series.loc[idx]}' != expected '{expected_currency}'"
})
else:
# Require single currency across dataset
if len(set(cur_series[~cur_missing])) > 1:
# Report a dataset-level inconsistency with per-row notes
for idx in df.index[~cur_missing]:
# Only flag rows that contribute to inconsistency (i.e., not the modal value)
# For simplicity, flag all non-missing rows when multiple currencies exist
errors.append({
"row_index": idx,
"campaign_id": str(df.loc[idx, "campaign_id"]) if not pd.isna(df.loc[idx, "campaign_id"]) else None,
"field": currency_col,
"code": "consistency_error",
"message": f"Multiple currencies present in dataset: {currency_unique_values}"
})
else:
currency_check_status = "skipped_no_column"
# Build errors DataFrame
errors_df = pd.DataFrame(errors, columns=["row_index", "campaign_id", "field", "code", "message"])
errors_df.sort_values(by=["row_index", "field"], inplace=True)
# Summary metrics
summary = {
"row_count": int(n_rows),
"error_count": int(len(errors_df)),
"missing_campaign_id": int(((df["campaign_id"].isna()) | (df["campaign_id"].astype(str).str.strip() == "")).sum()),
"invalid_channel": int(ch_invalid.sum()),
"missing_channel": int(ch_null.sum()),
"invalid_impression": int((~valid_numeric["impression"]).sum()),
"invalid_click": int((~valid_numeric["click"]).sum()),
"invalid_conversion": int((~valid_numeric["conversion"]).sum()),
"invalid_cost": int((~valid_numeric["cost"]).sum()),
"negative_impression": int((parsed_numeric["impression"] < 0).sum()),
"negative_click": int((parsed_numeric["click"] < 0).sum()),
"negative_conversion": int((parsed_numeric["conversion"] < 0).sum()),
"negative_cost": int((parsed_numeric["cost"] < 0).sum()),
"funnel_inconsistency": int(funnel_violation.sum()),
"invalid_start_date": int((~start_valid).sum()),
"invalid_end_date": int((~end_valid).sum()),
"date_range_violation": int(interval_violation.sum()),
"currency_check_status": currency_check_status,
"currency_unique_values": currency_unique_values if currency_unique_values is not None else []
}
return {
"fatal": False,
"message": "Validation completed",
"errors": errors_df,
"summary": summary
}
def main():
parser = argparse.ArgumentParser(description="Validate campaign report dataset integrity.")
parser.add_argument("--input", required=True, help="Path to input CSV file")
parser.add_argument("--output", required=False, help="Path to output CSV file for row-level errors")
parser.add_argument("--currency-col", required=False, help="Currency column name (e.g., 'currency')")
parser.add_argument("--expected-currency", required=False, help="Expected currency code (e.g., 'USD')")
parser.add_argument("--delimiter", required=False, default=",", help="CSV delimiter (default ',')")
parser.add_argument("--encoding", required=False, default="utf-8", help="File encoding (default 'utf-8')")
args = parser.parse_args()
try:
df = pd.read_csv(args.input, delimiter=args.delimiter, encoding=args.encoding)
except Exception as e:
print(f"Failed to read input file: {e}", file=sys.stderr)
sys.exit(2)
result = validate_campaign_report(df, currency_col=args.currency_col, expected_currency=args.expected_currency)
if result.get("fatal", False):
print(f"Fatal error: {result.get('message')}", file=sys.stderr)
sys.exit(1)
# Print summary
summary = result["summary"]
print("Validation Summary")
print(f"Rows: {summary['row_count']}")
print(f"Total errors: {summary['error_count']}")
print(f"Missing campaign_id: {summary['missing_campaign_id']}")
print(f"Missing channel: {summary['missing_channel']}")
print(f"Invalid channel enum: {summary['invalid_channel']}")
print(f"Invalid impression: {summary['invalid_impression']} | Negative impression: {summary['negative_impression']}")
print(f"Invalid click: {summary['invalid_click']} | Negative click: {summary['negative_click']}")
print(f"Invalid conversion: {summary['invalid_conversion']} | Negative conversion: {summary['negative_conversion']}")
print(f"Invalid cost: {summary['invalid_cost']} | Negative cost: {summary['negative_cost']}")
print(f"Funnel inconsistencies: {summary['funnel_inconsistency']}")
print(f"Invalid start_date: {summary['invalid_start_date']}")
print(f"Invalid end_date: {summary['invalid_end_date']}")
print(f"Date range violations: {summary['date_range_violation']}")
print(f"Currency check: {summary['currency_check_status']}")
if summary["currency_unique_values"]:
print(f"Currencies detected: {summary['currency_unique_values']}")
# Write per-row errors if requested
if args.output:
try:
result["errors"].to_csv(args.output, index=False)
print(f"Row-level errors written to {args.output}")
except Exception as e:
print(f"Failed to write output errors CSV: {e}", file=sys.stderr)
sys.exit(3)
if __name__ == "__main__":
main()
把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。
在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。
在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。
活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期