¥
立即购买

数据完整性校验脚本生成器

353 浏览
31 试用
8 购买
Dec 18, 2025更新

针对不同数据集特征和校验需求,能够快速生成可执行的Python脚本,用于系统化验证数据完整性。脚本涵盖缺失值、重复值和异常值等核心检查项,确保数据质量评估标准化、结果可复现,为数据清洗、分析和建模提供可靠支持。

以下提供一个可复用的PySpark数据完整性校验脚本,覆盖指定的10类检查。脚本在大数据量(480万行、45列)下可扩展运行,支持CSV/Parquet输入,采用统一口径、模块化实现,并输出JSON报告与问题样本。可按需调整业务规则与枚举配置。

一、适用场景与假设

  • 数据来源:自营电商订单系统与支付网关,T+1批量入库。
  • 数据规模:近3个月约480万行,45列。
  • 主键:order_id。
  • 字段类型核心约束:
    • 整型/小数/日期时间/枚举/布尔保持一致:quantity、unit_price(2位小数)、discount(小数)、pay_amount(小数)、pay_time/ship_time(时间戳)、currency/status/channel(枚举)、is_refund(布尔)。
  • 业务特性考虑:
    • 节假日峰值明显(不作为异常)。
    • 存在分期支付与零钱抵扣:不强制 pay_amount = quantity*unit_price - discount;改为弱一致性约束(pay_amount 不应超过商品原价总额,允许抵扣导致金额更低)。
  • 不对 status/province 的具体取值做强绑定,提供可配置项,默认仅进行格式/非空与逻辑关系校验。

二、主要校验内容映射

  • 缺失值检查:关键列不得为NULL;非关键列统计缺失。
  • 重复值检查:主键唯一性、重复明细样本。
  • 异常值检查(统计):对 quantity、unit_price、pay_amount 使用IQR检测异常。
  • 异常值检查(业务):金额、时间、退款逻辑等硬约束。
  • 数据类型一致性:按预期Schema强制类型转换,统计转换失败记录。
  • 主键唯一性检查:order_id唯一。
  • 值域范围检查:如 quantity>=1、金额非负(非退款场景)。
  • 格式规范性检查:货币码格式、价格小数位(2位)等。
  • 逻辑一致性:时间顺序(ship_time≥pay_time),退款金额符号等。
  • 时间序列连续性:pay_time按天连续性(T+1允许数据上限到当前日期-1)。

三、运行方式

  • 依赖:Spark 3.x,Python 3.8+。
  • 执行示例: spark-submit integrity_check_orders.py
    --input-path s3://bucket/orders_3m/
    --input-format parquet
    --output-dir s3://bucket/quality_report/
    --max-sample-rows 10000
    --currencies CNY
    --t1-lag-days 1

四、PySpark脚本:integrity_check_orders.py

#!/usr/bin/env python3

-- coding: utf-8 --

import argparse import json import sys from datetime import datetime, date, timedelta

from pyspark.sql import SparkSession, DataFrame from pyspark.sql import functions as F from pyspark.sql import types as T

def parse_args(): p = argparse.ArgumentParser(description="Order dataset integrity checks") p.add_argument("--input-path", required=True, help="Input path (CSV/Parquet)") p.add_argument("--input-format", required=True, choices=["csv", "parquet"], help="Input format") p.add_argument("--output-dir", required=True, help="Output directory for reports and samples") p.add_argument("--max-sample-rows", type=int, default=10000, help="Max sample rows per failed check") p.add_argument("--currencies", type=str, default="CNY", help="Comma-separated valid currency codes (3-letter). If unknown, format check only.") p.add_argument("--t1-lag-days", type=int, default=1, help="T+1 ingestion lag days (used to bound pay_time upper limit)") p.add_argument("--timezone", type=str, default="Asia/Shanghai", help="Timezone for date boundaries") return p.parse_args()

def build_spark(): return (SparkSession.builder .appName("order_integrity_checks") .config("spark.sql.session.timeZone", "UTC") # do arithmetic in UTC; adjust display if needed .getOrCreate())

def get_schema(): # Use permissive schema for robustness; cast later and track failures. # unit_price: 2 decimal places as business rule; we use Decimal(18, 4) interim to catch >2 decimals in format check. return T.StructType([ T.StructField("order_id", T.StringType(), True), T.StructField("user_id", T.StringType(), True), T.StructField("sku_id", T.StringType(), True), T.StructField("item_name", T.StringType(), True), T.StructField("quantity", T.StringType(), True), # read as string first T.StructField("unit_price", T.StringType(), True), T.StructField("currency", T.StringType(), True), T.StructField("discount", T.StringType(), True), T.StructField("pay_amount", T.StringType(), True), T.StructField("pay_time", T.StringType(), True), T.StructField("ship_time", T.StringType(), True), T.StructField("province", T.StringType(), True), T.StructField("status", T.StringType(), True), T.StructField("channel", T.StringType(), True), T.StructField("is_refund", T.StringType(), True), # ... other ~30 columns kept as-is ])

def cast_columns(df: DataFrame) -> (DataFrame, DataFrame): # Keep raw columns for format checks, create typed columns for logic checks. # Track cast failures per column. # Decimal types: store as Decimal(18, 4) to safely perform rounding checks; business rule will limit unit_price to 2 decimals. typed = df.withColumn("quantity_t", F.col("quantity").cast("long"))
.withColumn("unit_price_t", F.col("unit_price").cast(T.DecimalType(18, 4)))
.withColumn("discount_t", F.col("discount").cast(T.DecimalType(18, 4)))
.withColumn("pay_amount_t", F.col("pay_amount").cast(T.DecimalType(18, 4)))
.withColumn("pay_time_t", F.to_timestamp("pay_time"))
.withColumn("ship_time_t", F.to_timestamp("ship_time"))
.withColumn("is_refund_t", F.when(F.lower(F.col("is_refund")).isin("true", "1", "t", "yes"), F.lit(True)) .when(F.lower(F.col("is_refund")).isin("false", "0", "f", "no"), F.lit(False)) .otherwise(None)) # Cast error flags: original non-null but typed null cast_errors = [] for orig, typed_col in [ ("quantity", "quantity_t"), ("unit_price", "unit_price_t"), ("discount", "discount_t"), ("pay_amount", "pay_amount_t"), ("pay_time", "pay_time_t"), ("ship_time", "ship_time_t"), ("is_refund", "is_refund_t") ]: cast_errors.append( F.when((F.col(orig).isNotNull()) & (F.col(typed_col).isNull()), F.lit(orig)) ) cast_err_col = F.array_remove(F.array(*cast_errors), F.lit(None)).alias("type_error_cols") typed = typed.withColumn("type_error_cols", cast_err_col) return typed, typed.filter(F.size(F.col("type_error_cols")) > 0)

def write_sample(df: DataFrame, path: str, n: int): if df is None: return 0 count = df.limit(n).count() if count > 0: df.limit(n).coalesce(1).write.mode("overwrite").option("header", True).csv(path) return count

def to_date_in_tz(col_ts, tz): # Convert timestamp to date in specified timezone return F.to_date(F.from_utc_timestamp(col_ts, tz))

def iqr_bounds(df: DataFrame, col: str): # Use approx percentiles q = df.selectExpr(f"percentile_approx({col}, array(0.25, 0.75), 1000) as q").collect()[0]["q"] if q is None or len(q) < 2: return None q1, q3 = float(q[0]), float(q[1]) iqr = q3 - q1 low = q1 - 1.5 * iqr high = q3 + 1.5 * iqr return low, high

def main(): args = parse_args() spark = build_spark() spark.sparkContext.setLogLevel("WARN")

cfg = {
    "valid_currencies": [c.strip() for c in args.currencies.split(",") if c.strip()],
    "timezone": args.timezone,
    "t1_lag_days": args.t1_lag_days,
    "max_sample_rows": args.max_sample_rows
}

# 1) Load
if args.input_format == "csv":
    df = (spark.read
          .option("header", True)
          .option("multiLine", True)
          .option("escape", "\"")
          .schema(get_schema())
          .csv(args.input_path))
else:
    df = spark.read.parquet(args.input_path)
df = df.persist()

report = {"generated_at": datetime.utcnow().isoformat() + "Z",
          "checks": []}

# 2) Type casting and type errors
df_t, type_err_rows = cast_columns(df)
df_t = df_t.persist()

# Helper for adding check result
def add_check(name, status, metrics=None, sample_rel_path=None, note=None):
    entry = {"name": name, "status": status}
    if metrics is not None: entry["metrics"] = metrics
    if sample_rel_path is not None: entry["sample"] = sample_rel_path
    if note is not None: entry["note"] = note
    report["checks"].append(entry)

# 3) 缺失值检查
required_cols = ["order_id", "user_id", "sku_id", "quantity_t",
                 "unit_price_t", "currency", "pay_amount_t", "pay_time_t",
                 "status", "channel", "is_refund_t"]
miss_metrics = {}
for c in required_cols:
    col0 = c if c in df_t.columns else c  # actual typed col
    miss_metrics[c] = df_t.filter(F.col(col0).isNull()).count()
missing_total = sum(miss_metrics.values())
status = "PASS" if missing_total == 0 else "FAIL"
sample_path = None
if missing_total > 0:
    sample_df = None
    cond = None
    for c in required_cols:
        cond = F.col(c).isNull() if cond is None else (cond | F.col(c).isNull())
    sample_df = df_t.filter(cond).select("order_id", *required_cols)
    sample_path = f"{args.output_dir}/samples/missing_required"
    write_sample(sample_df, sample_path, cfg["max_sample_rows"])
add_check("缺失值检查", status, miss_metrics, sample_path)

# 4) 数据类型一致性检查
type_err_cnt = type_err_rows.count()
type_status = "PASS" if type_err_cnt == 0 else "FAIL"
type_sample_path = None
if type_err_cnt > 0:
    type_sample_path = f"{args.output_dir}/samples/type_inconsistency"
    write_sample(type_err_rows.select("order_id", "type_error_cols", "quantity", "unit_price", "discount", "pay_amount", "pay_time", "ship_time", "is_refund"),
                 type_sample_path, cfg["max_sample_rows"])
add_check("数据类型一致性检查", type_status, {"type_error_rows": type_err_cnt}, type_sample_path)

# 5) 主键唯一性检查 + 重复值检查
dup_df = df_t.groupBy("order_id").count().filter(F.col("count") > 1)
dup_cnt = dup_df.count()
dup_status = "PASS" if dup_cnt == 0 else "FAIL"
dup_sample_path = None
if dup_cnt > 0:
    dup_ids = dup_df.select("order_id").limit(cfg["max_sample_rows"])
    dup_rows = df_t.join(dup_ids, on="order_id", how="inner")
    dup_sample_path = f"{args.output_dir}/samples/duplicate_order_id"
    write_sample(dup_rows, dup_sample_path, cfg["max_sample_rows"])
add_check("主键唯一性检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)
add_check("重复值检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)

# 6) 值域范围检查(硬约束)
# - quantity >= 1
# - unit_price_t >= 0
# - discount_t >= 0
# - is_refund_t=False => pay_amount_t >= 0
# - is_refund_t=True  => pay_amount_t <= 0 (允许0代表全额撤销后净额为0)
# - pay_amount 不应超过原价总额(弱约束,FAIL;如有零钱抵扣只会降低,不会升高)
gross_amount = (F.col("quantity_t").cast(T.DecimalType(18, 4)) * F.col("unit_price_t"))
vr_bad = df_t.filter(
    (F.col("quantity_t") < 1) |
    (F.col("unit_price_t") < 0) |
    (F.col("discount_t") < 0) |
    ((F.col("is_refund_t") == F.lit(False)) & (F.col("pay_amount_t") < 0)) |
    ((F.col("is_refund_t") == F.lit(True)) & (F.col("pay_amount_t") > 0)) |
    (F.col("pay_amount_t") > gross_amount + F.lit(0.0001)) |  # allow minimal epsilon
    (F.col("discount_t") > gross_amount + F.lit(0.0001))
)
vr_cnt = vr_bad.count()
vr_status = "PASS" if vr_cnt == 0 else "FAIL"
vr_sample_path = None
if vr_cnt > 0:
    vr_sample_path = f"{args.output_dir}/samples/value_range"
    write_sample(vr_bad.select("order_id", "quantity_t", "unit_price_t", "discount_t", "pay_amount_t", "is_refund_t"), vr_sample_path, cfg["max_sample_rows"])
add_check("值域范围检查", vr_status, {"invalid_rows": vr_cnt}, vr_sample_path,
          note="包含数量/金额非负与金额不超过原价总额的约束;考虑零钱抵扣不做等式约束。")

# 7) 格式规范性检查
# - currency: 3-letter uppercase;若提供valid_currencies,则校验集合。
# - unit_price应为2位小数以内:检查 round(unit_price, 2) == unit_price
fmt_bad_currency = None
if cfg["valid_currencies"]:
    fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$") | (~F.col("currency").isin(cfg["valid_currencies"])))
else:
    fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$"))

price_scale_bad = df_t.filter(F.col("unit_price_t").isNotNull() & (F.col("unit_price_t") != F.round(F.col("unit_price_t"), 2)))

fmt_cnt = fmt_bad_currency.count() + price_scale_bad.count()
fmt_status = "PASS" if fmt_cnt == 0 else "FAIL"
fmt_sample_path = None
if fmt_cnt > 0:
    fmt_sample_path = f"{args.output_dir}/samples/format_issues"
    sample_union = fmt_bad_currency.select("order_id", "currency").withColumn("issue", F.lit("currency")) \
        .unionByName(price_scale_bad.select("order_id", "currency").withColumn("issue", F.lit("unit_price_scale")))
    write_sample(sample_union, fmt_sample_path, cfg["max_sample_rows"])
add_check("格式规范性检查", fmt_status, {"issues": fmt_cnt}, fmt_sample_path,
          note="验证货币码格式/集合与单价小数位不超过2位。")

# 8) 逻辑一致性检查
# - ship_time存在时应 >= pay_time
# - pay_time 不应晚于 当前时间 - t1_lag_days(按T+1)
# - is_refund逻辑已在值域检查校验金额符号
tz = cfg["timezone"]
today_utc = F.current_timestamp()
# 上界:当前时间减去T+1滞后
upper_bound_ts = F.expr(f"timestampadd(day, -{cfg['t1_lag_days']}, current_timestamp())")

logic_bad = df_t.filter(
    ((F.col("ship_time_t").isNotNull()) & (F.col("pay_time_t").isNotNull()) & (F.col("ship_time_t") < F.col("pay_time_t"))) |
    (F.col("pay_time_t") > upper_bound_ts)  # pay_time不应在T+1上界之后
)
logic_cnt = logic_bad.count()
logic_status = "PASS" if logic_cnt == 0 else "FAIL"
logic_sample_path = None
if logic_cnt > 0:
    logic_sample_path = f"{args.output_dir}/samples/logic_inconsistency"
    write_sample(logic_bad.select("order_id", "pay_time_t", "ship_time_t", "is_refund_t"), logic_sample_path, cfg["max_sample_rows"])
add_check("逻辑一致性检查", logic_status, {"invalid_rows": logic_cnt}, logic_sample_path,
          note="包含ship_time>=pay_time与T+1滞后上界检查。")

# 9) 异常值检查(统计,IQR)
# 对 quantity、unit_price_t、pay_amount_t 分别计算IQR边界并识别异常
stat_metrics = {}
stat_outlier_df = None
for col in ["quantity_t", "unit_price_t", "pay_amount_t"]:
    # 只在非空且非负的范围内计算(金额非负在非退款场景已保证;退款为负值会被视作异常,故仅对非退款子集做金额IQR以避免误报)
    base_df = df_t.filter(F.col(col).isNotNull())
    if col in ["unit_price_t"]:
        base_df = base_df.filter(F.col(col) >= 0)
    if col == "pay_amount_t":
        base_df = base_df.filter(F.col("is_refund_t") == F.lit(False))

    if base_df.count() == 0:
        continue
    bounds = iqr_bounds(base_df.withColumn(col, F.col(col).cast("double")), col)
    if not bounds:
        continue
    low, high = bounds
    outliers = df_t.filter(F.col(col).isNotNull() & ((F.col(col) < low) | (F.col(col) > high)))
    cnt = outliers.count()
    stat_metrics[col] = {"lower": low, "upper": high, "outliers": cnt}
    if cnt > 0:
        stat_outlier_df = outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")) if stat_outlier_df is None \
            else stat_outlier_df.unionByName(outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")))

stat_status = "PASS" if all(m["outliers"] == 0 for m in stat_metrics.values()) else "WARN"
stat_sample_path = None
if stat_outlier_df is not None:
    stat_sample_path = f"{args.output_dir}/samples/stat_outliers"
    write_sample(stat_outlier_df, stat_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于统计)", stat_status, stat_metrics, stat_sample_path,
          note="IQR方法;节假日销量峰值不在本检查范围(针对行级字段)。")

# 10) 异常值检查(基于业务规则)
# - pay_amount + discount <= quantity*unit_price + ε(允许抵扣导致更低;不允许超过)
# - 对退款单:允许 ship_time 有/无;不做更严金额等式约束
# 已在值域范围检查覆盖金额关系;此处补充负折扣、非整数数量、异常币种占比等业务风险提示
br_bad = df_t.filter(
    (F.col("quantity_t").cast("double") % 1 != 0) |  # 非整数数量
    (F.col("discount_t") < 0)
)
br_cnt = br_bad.count()
br_status = "PASS" if br_cnt == 0 else "FAIL"
br_sample_path = None
if br_cnt > 0:
    br_sample_path = f"{args.output_dir}/samples/business_rule_anomalies"
    write_sample(br_bad.select("order_id", "quantity_t", "discount_t"), br_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于业务规则)", br_status, {"invalid_rows": br_cnt}, br_sample_path,
          note="补充业务约束:数量需为整数、折扣不为负;金额关系已在值域范围检查中验证。")

# 11) 时间序列连续性检查(按pay_time的自然日)
date_col = to_date_in_tz(F.col("pay_time_t"), cfg["timezone"]).alias("pay_date")
dfd = df_t.select(date_col).filter(F.col("pay_date").isNotNull())
if dfd.count() > 0:
    min_date, max_date = dfd.agg(F.min("pay_date"), F.max("pay_date")).first()
    # 生成完整日期序列
    all_dates = spark.range(0, (max_date - min_date).days + 1) \
        .withColumn("pay_date", F.expr(f"date_add(to_date('{min_date}'), cast(id as int))")) \
        .select("pay_date")
    present = dfd.groupBy("pay_date").count().select("pay_date")
    missing = all_dates.join(present, on="pay_date", how="left_anti")
    missing_cnt = missing.count()
    ts_status = "PASS" if missing_cnt == 0 else "WARN"
    miss_sample_path = None
    if missing_cnt > 0:
        miss_sample_path = f"{args.output_dir}/samples/missing_dates"
        write_sample(missing, miss_sample_path, cfg["max_sample_rows"])
    add_check("时间序列连续性检查", ts_status,
              {"min_date": str(min_date), "max_date": str(max_date), "missing_days": missing_cnt},
              miss_sample_path,
              note="按pay_time日期连续性;T+1采集不影响日期序列。")
else:
    add_check("时间序列连续性检查", "FAIL", {"reason": "无有效pay_time记录"})

# 12) 输出JSON报告
report_path = f"{args.output_dir}/integrity_report.json"
with open("/tmp/integrity_report.json", "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)
# 保存报告到分布式存储
spark.createDataFrame([(json.dumps(report, ensure_ascii=False),)], ["json"]) \
    .coalesce(1).write.mode("overwrite").text(report_path)

# 13) 结束
print(json.dumps(report, ensure_ascii=False))
spark.stop()

if name == "main": sys.exit(main())

五、说明与可调项目

  • 可配置项:
    • valid_currencies:默认["CNY"];如多币种,传入 --currencies CNY,USD,HKD。
    • t1_lag_days:T+1入库约束,上限校验 pay_time <= 当前时间 - t1_lag_days。
    • timezone:用于按天连续性拆分(默认 Asia/Shanghai)。
  • 小数位规范:
    • unit_price 限制为2位小数(格式检查)。pay_amount与discount采用Decimal(18,4)运算,避免精度损失;如需强制2位,可增加相同检查。
  • 退款与分期:
    • 退款金额符号逻辑:is_refund=True => pay_amount<=0;否则 >=0。
    • 分期/零钱抵扣:不强制等式平衡,使用不超过原价总额的硬约束与统计异常检测。
  • 枚举字段:
    • status/channel/province未绑定具体枚举集,避免误伤;可接入维表或配置做集合校验。
  • 性能与资源:
    • 480万行适合单作业运行;IQR使用approx percentile,避免全量收敛。
    • 样本输出限制 max-sample-rows,默认1万行/检查,降存储压力。

六、后续扩展建议

  • 与维表对齐:
    • 省份合法值、渠道/状态枚举、SKU白名单等做维表JOIN并校验。
  • 金额对账增强:
    • 引入字段 wallet_deduction、coupon_amount、shipping_fee,并以等式:gross = quantity*unit_price;paid = gross - coupon - wallet - discount +/- rounding;对齐财务口径。
  • 异常阈值自适应:
    • 按币种/渠道分组计算IQR,减少结构性差异导致的误报。可用窗口函数或groupBy + percentile_approx。

该脚本覆盖所需的10类完整性校验,提供可审计的报告与样本输出,并在业务特性(节假日峰值、分期、零钱抵扣)下避免不合理误判。

以下为一份可复用的 PySpark 数据质量验证脚本,针对所述线上贷款申请数据集,覆盖所要求的10类校验项。脚本具备配置化、可扩展和批处理/流式可切换的能力,并输出结构化的校验结果与样本明细。

脚本文件名建议:dq_loan_app_validation.py

依赖:

  • Spark 3.x
  • Python 3.8+
  • 输入数据可为 parquet/csv/json(通过参数指定)

使用示例(批处理): spark-submit dq_loan_app_validation.py
--input /data/loan_apps/2024/
--input-format parquet
--output /data/dq_out/loan_apps/
--streaming false

代码: #!/usr/bin/env python3

-- coding: utf-8 --

""" 贷款申请数据质量验证脚本 覆盖校验项:

  1. 缺失值检查
  2. 重复值检查
  3. 异常值检查(基于统计:IQR/极端分位)
  4. 异常值检查(基于业务规则:可配置)
  5. 数据类型一致性检查与统一
  6. 主键唯一性检查
  7. 值域范围检查(枚举/数值)
  8. 格式规范性检查(IP/哈希/时间)
  9. 逻辑一致性检查(跨字段)
  10. 时间序列连续性检查(按天/小时覆盖与缺口)

说明:

  • 仅对存在于数据中的字段执行相应校验,缺失字段跳过并记录。
  • 业务阈值使用可配置规则。默认值保守,建议依据历史数据调优。
  • 结果以结构化明细与汇总指标方式输出,便于审计与复用。 """

import argparse import json import sys from datetime import datetime, timedelta

from pyspark.sql import SparkSession, DataFrame, Window from pyspark.sql import functions as F from pyspark.sql import types as T

-------------------------------

配置区(可按需调整/外部化为JSON/YAML)

-------------------------------

CONFIG = { "primary_key": "application_id", "timestamp_col": "app_datetime", "id_type_col": "id_type", "id_number_col": "id_number", # 已脱敏哈希 "core_numeric_cols": [ "age", "income_monthly", "debt_total", "loan_amount", "loan_term_months", "employment_length_years", "credit_score" ], "categorical_cols": [ "gender", "province", "marital_status", "education_level", "channel" ], "string_cols": ["device_id", "ip"], # 目标类型定义(统一类型) "schema_expectation": { "application_id": "string", "app_datetime": "timestamp", "id_type": "string", "id_number": "string", "age": "int", "gender": "string", "province": "string", "income_monthly": "double", "debt_total": "double", "loan_amount": "double", "loan_term_months": "int", "employment_length_years": "double", "marital_status": "string", "education_level": "string", "credit_score": "double", "device_id": "string", "ip": "string", "channel": "string" }, # 值域/枚举(如未知可留空,或仅做存在性检查) "enums": { "id_type": ["ID_CARD", "PASSPORT", "OTHER"], "gender": ["M", "F", "U", "UNKNOWN", "OTHER"], "marital_status": ["SINGLE", "MARRIED", "DIVORCED", "WIDOWED", "OTHER"], "education_level": ["HIGH_SCHOOL", "BACHELOR", "MASTER", "PHD", "OTHER"], "channel": [] # 若为空则不限制;否则精确匹配白名单 }, # 数值基本范围(保守默认,可根据历史数据调整;为None则跳过静态范围) "ranges": { "age": {"min": 18, "max": 75}, "loan_term_months": {"min": 1, "max": 120}, "employment_length_years": {"min": 0, "max": 60}, "income_monthly": {"min": 0, "max": None}, "debt_total": {"min": 0, "max": None}, "loan_amount": {"min": 0, "max": None}, "credit_score": {"min": 0, "max": None} }, # 业务规则(可配置开关与阈值) "business_rules": { # 时间不超过未来5分钟,且不早于今天-400天(约13个月) "app_datetime_future_minutes": 5, "app_datetime_past_days": 400, # DTI = debt_total/income_monthly:静态阈值与分位阈值取较宽松者 "max_dti_static": 5.0, # 若income=0则DTI设为null并记录异常 # 年龄与工作年限:age >= employment_length_years + 14(假设最早工作年龄14岁) "min_age_minus_employment_years": 14, # 重复申请:同(id_number, loan_amount, loan_term_months) 7天内重复视为重复 "repeat_within_days": 7 }, # 格式与规范 "format_checks": { # IPv4正则 "ipv4_regex": r"^(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d).){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$", # 哈希长度白名单(常见:MD5=32, SHA-1=40, SHA-256=64),未知时可放宽 "id_hash_lengths": [32, 40, 64], # device_id一般为字母数字与-,长度放宽 "device_id_regex": r"^[A-Za-z0-9-:.]{8,100}$", # 允许的省份列表若为空不校验 "province_whitelist": [] }, # 统计异常检测(IQR):乘数1.5为常规,3为极端 "iqr_multiplier": 1.5, "extreme_iqr_multiplier": 3.0, # 输出采样 "sample_rows": 100 }

-------------------------------

工具函数

-------------------------------

def spark_type_from_str(t: str): m = { "string": T.StringType(), "int": T.IntegerType(), "bigint": T.LongType(), "double": T.DoubleType(), "float": T.FloatType(), "boolean": T.BooleanType(), "timestamp": T.TimestampType(), "date": T.DateType() } return m.get(t, T.StringType())

def ensure_columns(df: DataFrame, cols): exists = [c for c in cols if c in df.columns] missing = [c for c in cols if c not in df.columns] return exists, missing

def cast_with_flag(df: DataFrame, col: str, to_type: str): target_type = spark_type_from_str(to_type) # 记录原始类型 cast_col = F.col(col).cast(target_type) ok_col = F.when(F.col(col).isNull(), F.lit(True))
.when(cast_col.isNull() & F.col(col).isNotNull(), F.lit(False))
.otherwise(F.lit(True)) df2 = df.withColumn(col, cast_col)
.withColumn(f"{col}__cast_ok", ok_col) return df2

def unify_types(df: DataFrame, schema_expectation: dict): df2 = df present = [c for c in schema_expectation.keys() if c in df.columns] for c in present: df2 = cast_with_flag(df2, c, schema_expectation[c]) return df2

def approx_iqr_bounds(df: DataFrame, col: str, multiplier: float): q = df.approxQuantile(col, [0.25, 0.75], 0.001) if not q or len(q) < 2: return None q1, q3 = q if q1 is None or q3 is None: return None iqr = q3 - q1 lower = q1 - multiplier * iqr upper = q3 + multiplier * iqr return lower, upper

def write_df(df: DataFrame, path: str): if df is None: return df.coalesce(1).write.mode("overwrite").parquet(path)

-------------------------------

校验逻辑

-------------------------------

def check_missingness(df: DataFrame, cfg: dict) -> DataFrame: cols = [c for c in df.columns if not c.endswith("__cast_ok")] exprs = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in cols] miss = df.agg(*exprs) miss_long = miss.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in miss.columns), [])])).alias("col", "missing_count")) miss_long = miss_long.withColumn("total_rows", F.lit(df.count()))
.withColumn("missing_rate", F.col("missing_count") / F.col("total_rows")) return miss_long

def check_primary_key_uniqueness(df: DataFrame, pk: str) -> DataFrame: if pk not in df.columns: return None dup = df.groupBy(pk).count().where(F.col("count") > 1) return dup

def check_exact_duplicates(df: DataFrame) -> DataFrame: # 完全相同行(全列相同) grp_cols = df.columns dup = df.groupBy(*grp_cols).count().where(F.col("count") > 1) return dup

def check_semantic_duplicates(df: DataFrame, cfg: dict) -> DataFrame: # 7天内重复申请:同(id_number, loan_amount, loan_term_months) needed = ["id_number", "loan_amount", "loan_term_months", "app_datetime"] exists, _ = ensure_columns(df, needed) if len(exists) < 4: return None w = Window.partitionBy("id_number", "loan_amount", "loan_term_months").orderBy(F.col("app_datetime").cast("timestamp")) prev_ts = F.lag("app_datetime").over(w) seconds_diff = F.when(prev_ts.isNotNull(), F.col("app_datetime").cast("long") - prev_ts.cast("long")).otherwise(None) within = df.withColumn("prev_app_datetime", prev_ts)
.withColumn("seconds_from_prev", seconds_diff)
.withColumn("repeat_within_days", (F.col("seconds_from_prev").isNotNull()) & (F.col("seconds_from_prev") <= F.lit(cfg["business_rules"]["repeat_within_days"] * 86400))) return within.where(F.col("repeat_within_days") == True)

def check_type_consistency_flags(df: DataFrame) -> DataFrame: cast_cols = [c for c in df.columns if c.endswith("__cast_ok")] if not cast_cols: return None bad = [] for c in cast_cols: src = c.replace("__cast_ok", "") bad.append(F.sum((~F.col(c)).cast("int")).alias(src)) out = df.agg(*bad) out_long = out.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in out.columns), [])])).alias("col", "failed_casts")) return out_long

def check_value_ranges(df: DataFrame, ranges: dict) -> DataFrame: violations = None for col, r in ranges.items(): if col not in df.columns: continue conds = [] if r.get("min") is not None: conds.append(F.col(col) < F.lit(r["min"])) if r.get("max") is not None: conds.append(F.col(col) > F.lit(r["max"])) if not conds: continue v = df.where(conds[0] if len(conds) == 1 else (conds[0] | conds[1]))
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_enums(df: DataFrame, enums: dict) -> DataFrame: violations = None for col, allowed in enums.items(): if col not in df.columns or not allowed: continue v = df.where(~F.col(col).isin(allowed) & F.col(col).isNotNull())
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_formats(df: DataFrame, cfg: dict) -> DataFrame: viols = None # IP if "ip" in df.columns: v1 = df.where(F.col("ip").isNotNull() & (~F.col("ip").rlike(cfg["format_checks"]["ipv4_regex"])))
.select(F.lit("ip").alias("col"), F.col("ip").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v1 if viols is None else viols.unionByName(v1) # id_number哈希长度 if CONFIG["id_number_col"] in df.columns: v2 = df.where(F.col(CONFIG["id_number_col"]).isNotNull() & (~F.length(F.col(CONFIG["id_number_col"])).isin(cfg["format_checks"]["id_hash_lengths"])))
.select(F.lit(CONFIG["id_number_col"]).alias("col"), F.col(CONFIG["id_number_col"]).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2 if viols is None else viols.unionByName(v2) # device_id if "device_id" in df.columns: v3 = df.where(F.col("device_id").isNotNull() & (~F.col("device_id").rlike(cfg["format_checks"]["device_id_regex"])))
.select(F.lit("device_id").alias("col"), F.col("device_id").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v3 if viols is None else viols.unionByName(v3) # 省份白名单 if "province" in df.columns and cfg["format_checks"]["province_whitelist"]: v4 = df.where(F.col("province").isNotNull() & (~F.col("province").isin(cfg["format_checks"]["province_whitelist"])))
.select(F.lit("province").alias("col"), F.col("province").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v4 if viols is None else viols.unionByName(v4) return viols

def check_business_rules(df: DataFrame, cfg: dict) -> DataFrame: br = cfg["business_rules"] viols = None now = F.current_timestamp() # 时间窗口:不在未来+5分钟,且不早于past_days if CONFIG["timestamp_col"] in df.columns: v1 = df.where( (F.col(CONFIG["timestamp_col"]) > F.expr(f"timestampadd(MINUTE, {br['app_datetime_future_minutes']}, current_timestamp())")) | (F.col(CONFIG["timestamp_col"]) < F.expr(f"timestampadd(DAY, -{br['app_datetime_past_days']}, current_timestamp())")) ).select(F.lit(CONFIG["timestamp_col"]).alias("rule"), F.col(CONFIG["timestamp_col"]).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v1 if viols is None else viols.unionByName(v1) # DTI if "debt_total" in df.columns and "income_monthly" in df.columns: dti = df.withColumn("DTI", F.when(F.col("income_monthly") > 0, F.col("debt_total") / F.col("income_monthly")) .otherwise(F.lit(None))) v2 = dti.where(F.col("DTI").isNotNull() & (F.col("DTI") > F.lit(br["max_dti_static"])))
.select(F.lit("DTI_gt_static").alias("rule"), F.col("DTI").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2 if viols is None else viols.unionByName(v2) # income=0但debt>0的异常 v2b = df.where((F.col("income_monthly") <= 0) & (F.col("debt_total") > 0))
.select(F.lit("income_zero_but_debt_positive").alias("rule"), F.struct("income_monthly", "debt_total").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2b if viols is None else viols.unionByName(v2b) # 年龄与工作年限 if "age" in df.columns and "employment_length_years" in df.columns: v3 = df.where(F.col("age") < (F.col("employment_length_years") + F.lit(br["min_age_minus_employment_years"])))
.select(F.lit("age_vs_employment_years").alias("rule"), F.struct("age", "employment_length_years").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v3 if viols is None else viols.unionByName(v3) # 贷款金额必须 > 0 if "loan_amount" in df.columns: v4 = df.where(F.col("loan_amount") <= 0)
.select(F.lit("loan_amount_positive").alias("rule"), F.col("loan_amount").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v4 if viols is None else viols.unionByName(v4) return viols

def check_statistical_outliers(df: DataFrame, cfg: dict) -> DataFrame: violations = None for col in cfg["core_numeric_cols"]: if col not in df.columns: continue bounds = approx_iqr_bounds(df.where(F.col(col).isNotNull()), col, cfg["iqr_multiplier"]) if not bounds: continue lower, upper = bounds v = df.where((F.col(col) < F.lit(lower)) | (F.col(col) > F.lit(upper)))
.select(F.lit(col).alias("col"), F.col(col).alias("value"), F.lit(lower).alias("iqr_lower"), F.lit(upper).alias("iqr_upper"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_time_continuity(df: DataFrame, cfg: dict) -> (DataFrame, DataFrame): # 连续性检查:按天/小时统计并找出缺口 ts = CONFIG["timestamp_col"] if ts not in df.columns: return None, None # 聚合 daily = df.groupBy(F.to_date(F.col(ts)).alias("date")).count() hourly = df.groupBy(F.date_format(F.col(ts), "yyyy-MM-dd HH:00:00").alias("hour")).count()

# 生成日期序列
min_max = df.agg(F.min(F.to_date(F.col(ts))).alias("min_d"),
                 F.max(F.to_date(F.col(ts))).alias("max_d")).collect()[0]
if min_max["min_d"] is None or min_max["max_d"] is None:
    return daily, hourly
min_d = min_max["min_d"]
max_d = min_max["max_d"]

spark = df.sql_ctx.sparkSession
dates = spark.range(0, F.datediff(F.lit(max_d), F.lit(min_d)) + 1)\
             .withColumn("date", F.expr(f"date_add(to_date('{min_d}'), cast(id as int))"))\
             .select("date")
daily_full = dates.join(daily, on="date", how="left").fillna({"count": 0})
missing_days = daily_full.where(F.col("count") == 0)

# 小时级序列(窗口按整点)
# 计算小时范围
min_max_h = df.agg(F.min(F.date_trunc("hour", F.col(ts))).alias("min_h"),
                   F.max(F.date_trunc("hour", F.col(ts))).alias("max_h")).collect()[0]
if min_max_h["min_h"] is None or min_max_h["max_h"] is None:
    return daily_full, hourly
min_h = min_max_h["min_h"]
max_h = min_max_h["max_h"]
hours = spark.range(0, (int((max_h.timestamp() - min_h.timestamp()) // 3600) + 1))\
             .withColumn("hour", F.expr(f"timestampadd(HOUR, cast(id as int), timestamp('{min_h}'))"))\
             .select("hour")
hourly_full = hours.join(hourly.withColumn("hour", F.col("hour").cast("timestamp")), on="hour", how="left").fillna({"count": 0})
missing_hours = hourly_full.where(F.col("count") == 0)

# 返回覆盖与缺口
return missing_days, missing_hours

-------------------------------

主流程

-------------------------------

def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True, help="输入路径(目录或文件)") parser.add_argument("--input-format", default="parquet", choices=["parquet", "csv", "json"], help="输入格式") parser.add_argument("--output", required=True, help="输出目录") parser.add_argument("--streaming", default="false", choices=["true", "false"], help="是否使用Structured Streaming") parser.add_argument("--csv-header", default="true", choices=["true", "false"], help="CSV是否包含表头") args = parser.parse_args()

spark = SparkSession.builder.appName("DQ_Loan_Applications").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# 读入
if args.streaming == "true":
    # 流式模式(示例):以文件夹为源,micro-batch执行;注意:某些汇总(如approxQuantile)需使用complete/append触发器或批流分离
    if args.input_format == "parquet":
        df = spark.readStream.format("parquet").load(args.input)
    elif args.input_format == "csv":
        df = spark.readStream.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
    else:
        df = spark.readStream.json(args.input)
    # 简化:流式仅做记录级别规则检查与落地(完整统计建议离线批处理)
    print("注意:流式模式下统计型/IQR类校验建议离线批处理运行,本脚本流式仅演示结构,可在生产中分层实现。")
    sys.exit(0)
else:
    if args.input_format == "parquet":
        df = spark.read.parquet(args.input)
    elif args.input_format == "csv":
        df = spark.read.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
    else:
        df = spark.read.json(args.input)

# 统一类型并标记类型转换失败
df = unify_types(df, CONFIG["schema_expectation"])

# 1) 缺失值检查
missing_report = check_missingness(df, CONFIG)
write_df(missing_report, f"{args.output}/missing_report")

# 2) 重复值检查
pk_dups = check_primary_key_uniqueness(df, CONFIG["primary_key"])
write_df(pk_dups, f"{args.output}/primary_key_duplicates")

exact_dups = check_exact_duplicates(df)
write_df(exact_dups, f"{args.output}/exact_duplicates")

semantic_dups = check_semantic_duplicates(df, CONFIG)
write_df(semantic_dups, f"{args.output}/semantic_duplicates")

# 3) 统计异常(IQR)
stat_outliers = check_statistical_outliers(df, CONFIG)
write_df(stat_outliers, f"{args.output}/statistical_outliers")

# 4) 业务规则异常
biz_violations = check_business_rules(df, CONFIG)
write_df(biz_violations, f"{args.output}/business_rule_violations")

# 5) 类型一致性检查(转换失败计数)
type_cast_report = check_type_consistency_flags(df)
write_df(type_cast_report, f"{args.output}/type_cast_report")

# 6) 主键唯一性(已在2中输出详细重复明细),这里输出唯一性状态
if pk_dups is not None:
    pk_dup_count = pk_dups.count()
else:
    pk_dup_count = None

# 7) 值域范围检查(数值范围)
range_violations = check_value_ranges(df, CONFIG["ranges"])
write_df(range_violations, f"{args.output}/range_violations")

# 8) 格式规范性检查
format_violations = check_formats(df, CONFIG)
write_df(format_violations, f"{args.output}/format_violations")

# 9) 逻辑一致性检查(在业务规则中覆盖了跨字段主要逻辑)
# 若需额外逻辑可在check_business_rules中扩展

# 10) 时间序列连续性检查
missing_days, missing_hours = check_time_continuity(df, CONFIG)
write_df(missing_days, f"{args.output}/missing_days")
write_df(missing_hours, f"{args.output}/missing_hours")

# 构建汇总指标
total_rows = df.count()
summary_rows = []

def cnt(df_):
    return None if df_ is None else df_.count()

summary_rows.append(("total_rows", total_rows))
summary_rows.append(("missing_report_cols", cnt(missing_report)))
summary_rows.append(("primary_key_dup_count", cnt(pk_dups)))
summary_rows.append(("exact_duplicates_groups", cnt(exact_dups)))
summary_rows.append(("semantic_duplicates_rows", cnt(semantic_dups)))
summary_rows.append(("statistical_outliers_rows", cnt(stat_outliers)))
summary_rows.append(("business_rule_violations_rows", cnt(biz_violations)))
summary_rows.append(("type_cast_failed_cols", cnt(type_cast_report)))
summary_rows.append(("range_violations_rows", cnt(range_violations)))
summary_rows.append(("format_violations_rows", cnt(format_violations)))
summary_rows.append(("missing_days", cnt(missing_days)))
summary_rows.append(("missing_hours", cnt(missing_hours)))

summary_df = spark.createDataFrame(summary_rows, schema=["metric", "value"])
write_df(summary_df, f"{args.output}/summary_metrics")

# 导出各类异常样本(便于快速排查)
sample_targets = {
    "statistical_outliers": stat_outliers,
    "business_rule_violations": biz_violations,
    "range_violations": range_violations,
    "format_violations": format_violations,
    "semantic_duplicates": semantic_dups,
    "primary_key_duplicates": pk_dups
}
for name, d in sample_targets.items():
    if d is not None and d.count() > 0:
        write_df(d.limit(CONFIG["sample_rows"]), f"{args.output}/samples/{name}")

print("DQ校验完成。汇总指标:")
summary_df.show(truncate=False)

spark.stop()

if name == "main": main()

实施与扩展说明:

  • 配置外置:建议将 CONFIG 外置为JSON/YAML文件,通过命令行参数加载,支持不同渠道/批次的规则差异化。
  • 渠道完备度:可在缺失值检查基础上,追加按channel的完备度透视(groupBy('channel').agg(missing率)),用于定位渠道特异性问题。
  • 风险特征分布漂移:添加按月/渠道的分布统计并与基线分布比较,以识别异常输入突变(非本脚本必需,推荐在数据监控层实现)。
  • 流式适配:对Structured Streaming的场景,建议在ODS层清洗(类型统一/格式校验)并落地,再由离线批处理执行统计型/IQR/连续性等全量校验,保证结果稳定与可审计性。
  • 指标落库:summary_metrics与各类违规明细可写入Hive/Delta,定期生成日报/告警。

以下为面向海量分钟级工业传感器数据(约8亿行/90天)的可扩展数据完整性校验脚本。脚本使用 PySpark,覆盖以下验证内容:

  • 缺失值检查
  • 重复值检查
  • 异常值检查(基于统计,IQR)
  • 异常值检查(基于业务规则)
  • 数据类型一致性检查(含正则/枚举)
  • 主键唯一性检查(device_id+timestamp)
  • 值域范围检查
  • 格式规范性检查(ID/版本号)
  • 逻辑一致性检查(含固件升级前后量纲/分布一致)
  • 时间序列连续性检查(分钟级断点/跳跃/重传)

说明与假设:

  • 建议以分区增量方式运行(例如按 site/date 分区),避免一次性全表扫描。
  • 统计异常使用设备级分位数近似计算(percentile_approx),鲁棒且适配大规模数据。
  • 固件升级一致性检查为可选步骤,默认开启;窗口和阈值可配置。
  • 业务规则边界为默认值,务必结合现场确认后调整。

脚本(PySpark 3.x):

from pyspark.sql import SparkSession, functions as F, types as T, Window as W import re

-----------------------------

配置区(请按需修改)

-----------------------------

CONFIG = { "input": { "format": "delta", # 可选:delta/parquet/table "path_or_table": "db.sensor_delta" # 路径或表名 }, "output": { "base_path": "dbfs:/tmp/quality_checks", # 或 HDFS/S3 路径 "save_mode": "overwrite" # overwrite/append }, "spark": { "shuffle_partitions": 2000, "adaptive_enabled": "true" }, "schema": { "device_id": T.StringType(), "timestamp": T.TimestampType(), # UTC "temperature": T.DoubleType(), # ℃ "humidity": T.DoubleType(), # %RH "vibration_rms": T.DoubleType(), # g "status": T.StringType(), # 枚举 "firmware_version": T.StringType(), "line_id": T.StringType(), "site": T.StringType() }, "primary_key": ["device_id", "timestamp"], "sampling": { "interval_seconds": 60, "expect_minute_aligned": True, # 时间戳是否应对齐至整分钟(秒=0) "alignment_tolerance_seconds": 0 # 若允许抖动可设为 >0 }, "domain_rules": { "status_enum": ["RUNNING","IDLE","MAINTENANCE","OFF","ERROR"], "device_id_regex": r"^[A-Za-z0-9_-.]+$", "firmware_semver_regex": r"^\d+.\d+.\d+(?:-[A-Za-z0-9]+)?$", "temperature_range": [-40.0, 125.0], # ℃(示例) "humidity_range": [0.0, 100.0], # %RH(物理边界) "vibration_rms_range": [0.0, 50.0], # g(示例,上限请现场确认) "rate_limits_per_min": { # 每分钟变化率上限(示例) "temperature": 10.0, # ℃/min "humidity": 20.0, # %RH/min "vibration_rms": 5.0 # g/min } }, "stat_outlier": { "use_iqr": True, "iqr_k": 3.0, # IQR*3 之外视为异常 "group_level": "device_id", # 以设备为组做分位统计 "approx_pct": [0.25, 0.5, 0.75], "rel_error": 1e-3 }, "time_bounds": { # 时间合理性(可选) "min_days_ago": 120, # 不早于当前时间-120天 "max_hours_ahead": 1 # 不晚于当前时间+1小时 }, "firmware_consistency_check": { "enabled": True, "pre_window_minutes": 60, "post_window_minutes": 60, "status_for_analysis": ["RUNNING"], # 仅在运行工况下对比 "ratio_tolerance": 0.15, # 核心指标中位数前后比偏离阈值(15%) "metrics": ["temperature", "humidity", "vibration_rms"] } }

-----------------------------

会话与读取

-----------------------------

spark = SparkSession.builder.appName("SensorDataQualityChecks").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", str(CONFIG["spark"]["shuffle_partitions"])) spark.conf.set("spark.sql.adaptive.enabled", CONFIG["spark"]["adaptive_enabled"])

expected_schema = T.StructType([T.StructField(k, v, True) for k, v in CONFIG["schema"].items()])

input_cfg = CONFIG["input"] if input_cfg["format"] == "table": df_raw = spark.table(input_cfg["path_or_table"]) elif input_cfg["format"] in ("delta", "parquet"): df_raw = spark.read.format(input_cfg["format"]).schema(expected_schema).load(input_cfg["path_or_table"]) else: raise ValueError("Unsupported input format")

df = df_raw.select([F.col(c) for c in CONFIG["schema"].keys()])

-----------------------------

统一类型与基础派生

-----------------------------

强制类型转换(安全起见再 cast 一次)

for c, t in CONFIG["schema"].items(): df = df.withColumn(c, F.col(c).cast(t))

时间戳对齐检测字段

df = df.withColumn("unix_ts", F.col("timestamp").cast("long"))

-----------------------------

1) 缺失值检查

-----------------------------

missing_counts = df.select([F.sum(F.col(c).isNull().cast("bigint")).alias(c) for c in df.columns]) missing_counts.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/missing_counts")

-----------------------------

2) 数据类型一致性与格式规范性检查

-----------------------------

枚举/正则

status_ok = F.col("status").isin(CONFIG["domain_rules"]["status_enum"]) device_id_ok = F.col("device_id").rlike(CONFIG["domain_rules"]["device_id_regex"]) fw_ok = F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]) | F.col("firmware_version").isNull()

dtype_violations = df.where(~status_ok | ~device_id_ok | ~fw_ok) dtype_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/dtype_format_violations")

-----------------------------

3) 主键唯一性与重复值检查

-----------------------------

pk = CONFIG["primary_key"] dups = df.groupBy(*pk).count().where(F.col("count") > 1) dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_pk")

明细重复(全行重复可选):若需要,可对 Key+所有字段 hash 去重检测

row_hash = F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in df.columns]), 256) df_with_hash = df.withColumn("row_hash", row_hash) full_dups = df_with_hash.groupBy("row_hash").count().where(F.col("count") > 1) full_dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_rows")

-----------------------------

4) 值域范围检查(物理/业务边界)

-----------------------------

r = CONFIG["domain_rules"] range_violations = df.where( (F.col("temperature").isNotNull() & ((F.col("temperature") < F.lit(r["temperature_range"][0])) | (F.col("temperature") > F.lit(r["temperature_range"][1])))) | (F.col("humidity").isNotNull() & ((F.col("humidity") < F.lit(r["humidity_range"][0])) | (F.col("humidity") > F.lit(r["humidity_range"][1])))) | (F.col("vibration_rms").isNotNull() & ((F.col("vibration_rms") < F.lit(r["vibration_rms_range"][0])) | (F.col("vibration_rms") > F.lit(r["vibration_rms_range"][1])))) ) range_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/range_violations")

-----------------------------

5) 时间序列连续性检查(分钟级)

-----------------------------

w_dev_time = W.partitionBy("device_id").orderBy("timestamp") df_seq = df.select(*df.columns).withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("prev_unix", F.col("prev_ts").cast("long"))
.withColumn("delta_sec", F.col("unix_ts") - F.col("prev_unix"))

缺口与重复

interval = CONFIG["sampling"]["interval_seconds"] gaps = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") > interval))
.withColumn("missing_points", (F.col("delta_sec")/interval - F.lit(1)).cast("bigint")) gaps.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_gaps")

retransmissions = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") == 0)) retransmissions.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/retransmissions")

对齐性(整分秒=0)

if CONFIG["sampling"]["expect_minute_aligned"]: aligned = df.withColumn("sec_in_min", (F.col("unix_ts") % 60).cast("int")) misaligned = aligned.where(F.abs(F.col("sec_in_min")) > CONFIG["sampling"]["alignment_tolerance_seconds"]) misaligned.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/misaligned_timestamps")

时间合理性边界(可选)

now = F.current_timestamp() upper_bound = F.expr(f"timestampadd(HOUR, {CONFIG['time_bounds']['max_hours_ahead']}, current_timestamp())") lower_bound = F.expr(f"timestampadd(DAY, -{CONFIG['time_bounds']['min_days_ago']}, current_timestamp())") time_violations = df.where((F.col("timestamp") < lower_bound) | (F.col("timestamp") > upper_bound)) time_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_violations")

-----------------------------

6) 异常值检查(统计,基于 IQR,设备级)

-----------------------------

pct = CONFIG["stat_outlier"]["approx_pct"] rel_err = CONFIG["stat_outlier"]["rel_error"] group_col = CONFIG["stat_outlier"]["group_level"] iqr_k = CONFIG["stat_outlier"]["iqr_k"]

def iqr_bounds(df_in, col): stats = df_in.groupBy(group_col).agg( F.expr(f"percentile_approx({col}, array({pct[0]},{pct[1]},{pct[2]}), {int(1/rel_err)})").alias("p") ).select( F.col(group_col), F.col("p").getItem(0).alias("q1"), F.col("p").getItem(1).alias("q2"), F.col("p").getItem(2).alias("q3") ).withColumn("iqr", F.col("q3") - F.col("q1"))
.withColumn("lo", F.col("q1") - F.lit(iqr_k)*F.col("iqr"))
.withColumn("hi", F.col("q3") + F.lit(iqr_k)*F.col("iqr")) return stats

metrics = ["temperature","humidity","vibration_rms"] outlier_union = None

for m in metrics: stats_m = iqr_bounds(df.where(F.col(m).isNotNull()), m) flagged = df.join(stats_m, on=group_col, how="left")
.where(F.col(m).isNotNull() & ((F.col(m) < F.col("lo")) | (F.col(m) > F.col("hi"))))
.select("device_id","timestamp", F.lit(m).alias("metric"), F.col(m).alias("value"), "lo","hi","q1","q2","q3") outlier_union = flagged if outlier_union is None else outlier_union.unionByName(flagged)

if outlier_union is not None: outlier_union.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/stat_outliers")

-----------------------------

7) 异常值检查(基于业务规则:变化率/工况)

-----------------------------

rate = CONFIG["domain_rules"]["rate_limits_per_min"] df_rate = df.select("device_id","timestamp","temperature","humidity","vibration_rms")
.withColumn("prev_temperature", F.lag("temperature").over(w_dev_time))
.withColumn("prev_humidity", F.lag("humidity").over(w_dev_time))
.withColumn("prev_vibration_rms", F.lag("vibration_rms").over(w_dev_time))
.withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("delta_min", (F.col("unix_ts") - F.col("prev_ts").cast("long"))/60.0)

biz_violations = df_rate.where(F.col("prev_ts").isNotNull() & (F.col("delta_min") > 0)).select( "device_id","timestamp","delta_min", (F.abs(F.col("temperature")-F.col("prev_temperature"))/F.col("delta_min") > F.lit(rate["temperature"])).alias("temp_rate_flag"), (F.abs(F.col("humidity")-F.col("prev_humidity"))/F.col("delta_min") > F.lit(rate["humidity"])).alias("hum_rate_flag"), (F.abs(F.col("vibration_rms")-F.col("prev_vibration_rms"))/F.col("delta_min") > F.lit(rate["vibration_rms"])).alias("vib_rate_flag"), "temperature","prev_temperature","humidity","prev_humidity","vibration_rms","prev_vibration_rms" ).where("temp_rate_flag OR hum_rate_flag OR vib_rate_flag")

biz_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/biz_rule_outliers")

-----------------------------

8) 逻辑一致性检查:固件版本单调非降、及升级前后量纲/分布一致性

-----------------------------

def parse_semver_col(col): # 提取主次修订号;非匹配置为 NULL return F.when(F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]), F.struct( F.regexp_extract(col, r"^(\d+)", 1).cast("int").alias("maj"), F.regexp_extract(col, r"^\d+.(\d+)", 1).cast("int").alias("min"), F.regexp_extract(col, r"^\d+.\d+.(\d+)", 1).cast("int").alias("pat") )).otherwise(F.lit(None))

df_fw = df.withColumn("semver", parse_semver_col(F.col("firmware_version"))) w_fw = W.partitionBy("device_id").orderBy("timestamp") df_fw = df_fw.withColumn("prev_semver", F.lag("semver").over(w_fw))

版本回退(非空比较)

downgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") < F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") < F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") < F.col("prev_semver.pat"))) )) downgrades.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_downgrades")

升级事件定位

upgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") > F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") > F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") > F.col("prev_semver.pat"))) )).select("device_id","timestamp","firmware_version").withColumnRenamed("timestamp","upgrade_ts")

if CONFIG["firmware_consistency_check"]["enabled"]: pre_m = CONFIG["firmware_consistency_check"]["pre_window_minutes"] post_m = CONFIG["firmware_consistency_check"]["post_window_minutes"] run_status = CONFIG["firmware_consistency_check"]["status_for_analysis"] metrics_for_fw = CONFIG["firmware_consistency_check"]["metrics"] tol = CONFIG["firmware_consistency_check"]["ratio_tolerance"]

# 将升级事件与同设备数据进行窗口联接(限定前后窗口,减少放大联接)
df_run = df.where(F.col("status").isin(run_status)).select("device_id","timestamp", *metrics_for_fw)

# 预备:把升级事件展开为左右窗口范围
upgrades_range = upgrades.withColumn("pre_start", F.expr(f"timestampadd(MINUTE, -{pre_m}, upgrade_ts)")) \
                         .withColumn("post_end", F.expr(f"timestampadd(MINUTE, {post_m}, upgrade_ts)"))
# 左右窗口聚合中位数
pre_join = df_run.alias("d").join(upgrades_range.alias("u"),
           (F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>=F.col("u.pre_start")) & (F.col("d.timestamp")<F.col("u.upgrade_ts")),
           "inner")
post_join = df_run.alias("d").join(upgrades_range.alias("u"),
           (F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>F.col("u.upgrade_ts")) & (F.col("d.timestamp")<=F.col("u.post_end")),
           "inner")

pre_aggs = pre_join.groupBy("u.device_id","u.upgrade_ts").agg(
    *[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"pre_{m}_p50") for m in metrics_for_fw]
)
post_aggs = post_join.groupBy("u.device_id","u.upgrade_ts").agg(
    *[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"post_{m}_p50") for m in metrics_for_fw]
)
fw_cmp = pre_aggs.join(post_aggs, ["device_id","upgrade_ts"], "inner")

# 计算比值偏离
for m in metrics_for_fw:
    fw_cmp = fw_cmp.withColumn(f"{m}_ratio", F.col(f"post_{m}_p50")/F.col(f"pre_{m}_p50"))

# 标记超阈事件
conds = [ (F.col(f"{m}_ratio") > (1+tol)) | (F.col(f"{m}_ratio") < (1-tol)) for m in metrics_for_fw ]
fw_inconsistent = fw_cmp.where(F.reduce(lambda a, b: a | b, conds))
fw_inconsistent.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_metric_inconsistency")

-----------------------------

9) 逻辑一致性(可扩展示例)

示例:当 status=OFF/MAINTENANCE 时 vibration_rms 应接近 0(若适用,需现场确认)

-----------------------------

logic_examples = df.where( (F.col("status").isin(["OFF","MAINTENANCE"])) & (F.col("vibration_rms").isNotNull()) & (F.col("vibration_rms") > 0.2) # 示例阈值,按现场调整 ) logic_examples.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/logic_inconsistency_examples")

-----------------------------

10) 汇总指标输出

-----------------------------

summary = {}

记录总量

summary["total_rows"] = df.count() summary_df = spark.createDataFrame([(k, v) for k, v in summary.items()], schema="metric string, value string") summary_df.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/summary")

关键违规计数

def save_count(df_in, name): cnt = df_in.count() spark.createDataFrame([(name, cnt)], "metric string, value long")
.write.mode("append").format("delta").save(f"{CONFIG['output']['base_path']}/summary")

save_count(dups, "duplicate_pk") save_count(full_dups, "duplicate_rows") save_count(range_violations, "range_violations") save_count(gaps, "time_gaps_events") save_count(retransmissions, "retransmissions") if CONFIG["sampling"]["expect_minute_aligned"]: save_count(misaligned, "misaligned_timestamps") if outlier_union is not None: save_count(outlier_union, "stat_outliers") save_count(biz_violations, "biz_rule_outliers") save_count(dtype_violations, "dtype_format_violations") save_count(time_violations, "time_violations") save_count(downgrades, "firmware_downgrades") if CONFIG["firmware_consistency_check"]["enabled"]: save_count(fw_inconsistent, "firmware_metric_inconsistency") save_count(logic_examples, "logic_inconsistency_examples")

print("Data quality checks completed. Results saved to:", CONFIG["output"]["base_path"])

使用建议与说明:

  • 性能与资源:
    • 强烈建议按分区(site/date 或 device_id hash)分批执行,减少shuffle与join放大。
    • percentile_approx 在设备级聚合下对上千设备规模更稳健;若设备数极大,可改为站点/产线级统计。
  • 统计异常策略:
    • IQR3 较稳健于尖峰;如希望更敏感,可降为 IQR1.5;或改用 median+MAD(需二次聚合)。
  • 业务规则:
    • 温度、湿度、振动上限为示例,请依据设备规格与现场极限修订。
    • 变化率阈值需结合传感器响应与物理极限校准,避免过报。
  • 固件一致性:
    • 使用升级前后各60分钟窗口的中位数比值评估量纲/标定漂移。若现场负载在升级窗口内显著变化,可增加过滤条件(如稳定工况标识)或延长窗口。
  • 时间连续性:
    • 若网关存在对齐抖动,可放宽 alignment_tolerance_seconds 或基于四舍五入到分钟的逻辑重采样进行评估。
  • 输出:
    • 各子检查均输出明细 Delta 数据集,summary 输出核心计数指标,便于下游可视化与告警集成。

示例详情

该提示词已被收录:
“AI工程师必备:高效建模与数据处理提示词合集”
覆盖建模到评估关键环节,助你快速构建高性能模型
√ 立即可用 · 零学习成本
√ 参数化批量生成
√ 专业提示词工程师打磨

解决的问题

把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。

适用用户

数据分析师

在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。

数据工程师

在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。

BI与运营团队

活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。

特征总结

一键生成数据完整性校验脚本,按数据集特征自动定制规则与阈值
自动识别缺失、重复与异常值,给出修复建议与示例代码,降低清洗成本
按业务字段定义必填、唯一与取值范围,快速定位问题列与具体问题记录
可设置容忍度与告警级别,生成可读报告,让数据质量与团队标准对齐
适配多种数据表结构与场景,灵活扩展校验项,省去繁琐手写脚本
结合上下文解释校验结果,指出业务风险影响与下一步处理路径
模板化参数输入,团队可复用,缩短从需求到上线的验证与交付时间
无缝嵌入日常分析流程,运行即输出检查结论与可执行修复清单
输出结构化、清晰的说明与报告,便于审计留痕与跨团队沟通协作

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥30.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 256 tokens
- 5 个可调节参数
{ 数据集特征 } { 校验范围 } { 输出脚本语言 } { 特定字段校验规则 } { 期望的脚本输出格式 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59