×
¥
查看详情
🔥 会员专享 文生文 教育

数据完整性校验脚本生成器

👁️ 368 次查看
📅 Dec 18, 2025
💡 核心价值: 针对不同数据集特征和校验需求,能够快速生成可执行的Python脚本,用于系统化验证数据完整性。脚本涵盖缺失值、重复值和异常值等核心检查项,确保数据质量评估标准化、结果可复现,为数据清洗、分析和建模提供可靠支持。

🎯 可自定义参数(5个)

数据集特征
描述待校验数据集的核心信息,如数据来源、业务背景、字段含义、数据规模、主要字段类型等。
校验范围
需要执行的数据完整性校验项目
输出脚本语言
编写数据校验脚本所使用的编程语言
特定字段校验规则
针对特定字段的详细校验规则,例如日期格式、金额范围、ID编码规则等。
期望的脚本输出格式
脚本执行后期望得到的输出结果形式

🎨 效果示例

以下提供一个可复用的PySpark数据完整性校验脚本,覆盖指定的10类检查。脚本在大数据量(480万行、45列)下可扩展运行,支持CSV/Parquet输入,采用统一口径、模块化实现,并输出JSON报告与问题样本。可按需调整业务规则与枚举配置。

一、适用场景与假设

  • 数据来源:自营电商订单系统与支付网关,T+1批量入库。
  • 数据规模:近3个月约480万行,45列。
  • 主键:order_id。
  • 字段类型核心约束:
    • 整型/小数/日期时间/枚举/布尔保持一致:quantity、unit_price(2位小数)、discount(小数)、pay_amount(小数)、pay_time/ship_time(时间戳)、currency/status/channel(枚举)、is_refund(布尔)。
  • 业务特性考虑:
    • 节假日峰值明显(不作为异常)。
    • 存在分期支付与零钱抵扣:不强制 pay_amount = quantity*unit_price - discount;改为弱一致性约束(pay_amount 不应超过商品原价总额,允许抵扣导致金额更低)。
  • 不对 status/province 的具体取值做强绑定,提供可配置项,默认仅进行格式/非空与逻辑关系校验。

二、主要校验内容映射

  • 缺失值检查:关键列不得为NULL;非关键列统计缺失。
  • 重复值检查:主键唯一性、重复明细样本。
  • 异常值检查(统计):对 quantity、unit_price、pay_amount 使用IQR检测异常。
  • 异常值检查(业务):金额、时间、退款逻辑等硬约束。
  • 数据类型一致性:按预期Schema强制类型转换,统计转换失败记录。
  • 主键唯一性检查:order_id唯一。
  • 值域范围检查:如 quantity>=1、金额非负(非退款场景)。
  • 格式规范性检查:货币码格式、价格小数位(2位)等。
  • 逻辑一致性:时间顺序(ship_time≥pay_time),退款金额符号等。
  • 时间序列连续性:pay_time按天连续性(T+1允许数据上限到当前日期-1)。

三、运行方式

  • 依赖:Spark 3.x,Python 3.8+。
  • 执行示例: spark-submit integrity_check_orders.py
    --input-path s3://bucket/orders_3m/
    --input-format parquet
    --output-dir s3://bucket/quality_report/
    --max-sample-rows 10000
    --currencies CNY
    --t1-lag-days 1

四、PySpark脚本:integrity_check_orders.py

#!/usr/bin/env python3

-- coding: utf-8 --

import argparse import json import sys from datetime import datetime, date, timedelta

from pyspark.sql import SparkSession, DataFrame from pyspark.sql import functions as F from pyspark.sql import types as T

def parse_args(): p = argparse.ArgumentParser(description="Order dataset integrity checks") p.add_argument("--input-path", required=True, help="Input path (CSV/Parquet)") p.add_argument("--input-format", required=True, choices=["csv", "parquet"], help="Input format") p.add_argument("--output-dir", required=True, help="Output directory for reports and samples") p.add_argument("--max-sample-rows", type=int, default=10000, help="Max sample rows per failed check") p.add_argument("--currencies", type=str, default="CNY", help="Comma-separated valid currency codes (3-letter). If unknown, format check only.") p.add_argument("--t1-lag-days", type=int, default=1, help="T+1 ingestion lag days (used to bound pay_time upper limit)") p.add_argument("--timezone", type=str, default="Asia/Shanghai", help="Timezone for date boundaries") return p.parse_args()

def build_spark(): return (SparkSession.builder .appName("order_integrity_checks") .config("spark.sql.session.timeZone", "UTC") # do arithmetic in UTC; adjust display if needed .getOrCreate())

def get_schema(): # Use permissive schema for robustness; cast later and track failures. # unit_price: 2 decimal places as business rule; we use Decimal(18, 4) interim to catch >2 decimals in format check. return T.StructType([ T.StructField("order_id", T.StringType(), True), T.StructField("user_id", T.StringType(), True), T.StructField("sku_id", T.StringType(), True), T.StructField("item_name", T.StringType(), True), T.StructField("quantity", T.StringType(), True), # read as string first T.StructField("unit_price", T.StringType(), True), T.StructField("currency", T.StringType(), True), T.StructField("discount", T.StringType(), True), T.StructField("pay_amount", T.StringType(), True), T.StructField("pay_time", T.StringType(), True), T.StructField("ship_time", T.StringType(), True), T.StructField("province", T.StringType(), True), T.StructField("status", T.StringType(), True), T.StructField("channel", T.StringType(), True), T.StructField("is_refund", T.StringType(), True), # ... other ~30 columns kept as-is ])

def cast_columns(df: DataFrame) -> (DataFrame, DataFrame): # Keep raw columns for format checks, create typed columns for logic checks. # Track cast failures per column. # Decimal types: store as Decimal(18, 4) to safely perform rounding checks; business rule will limit unit_price to 2 decimals. typed = df.withColumn("quantity_t", F.col("quantity").cast("long"))
.withColumn("unit_price_t", F.col("unit_price").cast(T.DecimalType(18, 4)))
.withColumn("discount_t", F.col("discount").cast(T.DecimalType(18, 4)))
.withColumn("pay_amount_t", F.col("pay_amount").cast(T.DecimalType(18, 4)))
.withColumn("pay_time_t", F.to_timestamp("pay_time"))
.withColumn("ship_time_t", F.to_timestamp("ship_time"))
.withColumn("is_refund_t", F.when(F.lower(F.col("is_refund")).isin("true", "1", "t", "yes"), F.lit(True)) .when(F.lower(F.col("is_refund")).isin("false", "0", "f", "no"), F.lit(False)) .otherwise(None)) # Cast error flags: original non-null but typed null cast_errors = [] for orig, typed_col in [ ("quantity", "quantity_t"), ("unit_price", "unit_price_t"), ("discount", "discount_t"), ("pay_amount", "pay_amount_t"), ("pay_time", "pay_time_t"), ("ship_time", "ship_time_t"), ("is_refund", "is_refund_t") ]: cast_errors.append( F.when((F.col(orig).isNotNull()) & (F.col(typed_col).isNull()), F.lit(orig)) ) cast_err_col = F.array_remove(F.array(*cast_errors), F.lit(None)).alias("type_error_cols") typed = typed.withColumn("type_error_cols", cast_err_col) return typed, typed.filter(F.size(F.col("type_error_cols")) > 0)

def write_sample(df: DataFrame, path: str, n: int): if df is None: return 0 count = df.limit(n).count() if count > 0: df.limit(n).coalesce(1).write.mode("overwrite").option("header", True).csv(path) return count

def to_date_in_tz(col_ts, tz): # Convert timestamp to date in specified timezone return F.to_date(F.from_utc_timestamp(col_ts, tz))

def iqr_bounds(df: DataFrame, col: str): # Use approx percentiles q = df.selectExpr(f"percentile_approx({col}, array(0.25, 0.75), 1000) as q").collect()[0]["q"] if q is None or len(q) < 2: return None q1, q3 = float(q[0]), float(q[1]) iqr = q3 - q1 low = q1 - 1.5 * iqr high = q3 + 1.5 * iqr return low, high

def main(): args = parse_args() spark = build_spark() spark.sparkContext.setLogLevel("WARN")

cfg = {
    "valid_currencies": [c.strip() for c in args.currencies.split(",") if c.strip()],
    "timezone": args.timezone,
    "t1_lag_days": args.t1_lag_days,
    "max_sample_rows": args.max_sample_rows
}

# 1) Load
if args.input_format == "csv":
    df = (spark.read
          .option("header", True)
          .option("multiLine", True)
          .option("escape", "\"")
          .schema(get_schema())
          .csv(args.input_path))
else:
    df = spark.read.parquet(args.input_path)
df = df.persist()

report = {"generated_at": datetime.utcnow().isoformat() + "Z",
          "checks": []}

# 2) Type casting and type errors
df_t, type_err_rows = cast_columns(df)
df_t = df_t.persist()

# Helper for adding check result
def add_check(name, status, metrics=None, sample_rel_path=None, note=None):
    entry = {"name": name, "status": status}
    if metrics is not None: entry["metrics"] = metrics
    if sample_rel_path is not None: entry["sample"] = sample_rel_path
    if note is not None: entry["note"] = note
    report["checks"].append(entry)

# 3) 缺失值检查
required_cols = ["order_id", "user_id", "sku_id", "quantity_t",
                 "unit_price_t", "currency", "pay_amount_t", "pay_time_t",
                 "status", "channel", "is_refund_t"]
miss_metrics = {}
for c in required_cols:
    col0 = c if c in df_t.columns else c  # actual typed col
    miss_metrics[c] = df_t.filter(F.col(col0).isNull()).count()
missing_total = sum(miss_metrics.values())
status = "PASS" if missing_total == 0 else "FAIL"
sample_path = None
if missing_total > 0:
    sample_df = None
    cond = None
    for c in required_cols:
        cond = F.col(c).isNull() if cond is None else (cond | F.col(c).isNull())
    sample_df = df_t.filter(cond).select("order_id", *required_cols)
    sample_path = f"{args.output_dir}/samples/missing_required"
    write_sample(sample_df, sample_path, cfg["max_sample_rows"])
add_check("缺失值检查", status, miss_metrics, sample_path)

# 4) 数据类型一致性检查
type_err_cnt = type_err_rows.count()
type_status = "PASS" if type_err_cnt == 0 else "FAIL"
type_sample_path = None
if type_err_cnt > 0:
    type_sample_path = f"{args.output_dir}/samples/type_inconsistency"
    write_sample(type_err_rows.select("order_id", "type_error_cols", "quantity", "unit_price", "discount", "pay_amount", "pay_time", "ship_time", "is_refund"),
                 type_sample_path, cfg["max_sample_rows"])
add_check("数据类型一致性检查", type_status, {"type_error_rows": type_err_cnt}, type_sample_path)

# 5) 主键唯一性检查 + 重复值检查
dup_df = df_t.groupBy("order_id").count().filter(F.col("count") > 1)
dup_cnt = dup_df.count()
dup_status = "PASS" if dup_cnt == 0 else "FAIL"
dup_sample_path = None
if dup_cnt > 0:
    dup_ids = dup_df.select("order_id").limit(cfg["max_sample_rows"])
    dup_rows = df_t.join(dup_ids, on="order_id", how="inner")
    dup_sample_path = f"{args.output_dir}/samples/duplicate_order_id"
    write_sample(dup_rows, dup_sample_path, cfg["max_sample_rows"])
add_check("主键唯一性检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)
add_check("重复值检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)

# 6) 值域范围检查(硬约束)
# - quantity >= 1
# - unit_price_t >= 0
# - discount_t >= 0
# - is_refund_t=False => pay_amount_t >= 0
# - is_refund_t=True  => pay_amount_t <= 0 (允许0代表全额撤销后净额为0)
# - pay_amount 不应超过原价总额(弱约束,FAIL;如有零钱抵扣只会降低,不会升高)
gross_amount = (F.col("quantity_t").cast(T.DecimalType(18, 4)) * F.col("unit_price_t"))
vr_bad = df_t.filter(
    (F.col("quantity_t") < 1) |
    (F.col("unit_price_t") < 0) |
    (F.col("discount_t") < 0) |
    ((F.col("is_refund_t") == F.lit(False)) & (F.col("pay_amount_t") < 0)) |
    ((F.col("is_refund_t") == F.lit(True)) & (F.col("pay_amount_t") > 0)) |
    (F.col("pay_amount_t") > gross_amount + F.lit(0.0001)) |  # allow minimal epsilon
    (F.col("discount_t") > gross_amount + F.lit(0.0001))
)
vr_cnt = vr_bad.count()
vr_status = "PASS" if vr_cnt == 0 else "FAIL"
vr_sample_path = None
if vr_cnt > 0:
    vr_sample_path = f"{args.output_dir}/samples/value_range"
    write_sample(vr_bad.select("order_id", "quantity_t", "unit_price_t", "discount_t", "pay_amount_t", "is_refund_t"), vr_sample_path, cfg["max_sample_rows"])
add_check("值域范围检查", vr_status, {"invalid_rows": vr_cnt}, vr_sample_path,
          note="包含数量/金额非负与金额不超过原价总额的约束;考虑零钱抵扣不做等式约束。")

# 7) 格式规范性检查
# - currency: 3-letter uppercase;若提供valid_currencies,则校验集合。
# - unit_price应为2位小数以内:检查 round(unit_price, 2) == unit_price
fmt_bad_currency = None
if cfg["valid_currencies"]:
    fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$") | (~F.col("currency").isin(cfg["valid_currencies"])))
else:
    fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$"))

price_scale_bad = df_t.filter(F.col("unit_price_t").isNotNull() & (F.col("unit_price_t") != F.round(F.col("unit_price_t"), 2)))

fmt_cnt = fmt_bad_currency.count() + price_scale_bad.count()
fmt_status = "PASS" if fmt_cnt == 0 else "FAIL"
fmt_sample_path = None
if fmt_cnt > 0:
    fmt_sample_path = f"{args.output_dir}/samples/format_issues"
    sample_union = fmt_bad_currency.select("order_id", "currency").withColumn("issue", F.lit("currency")) \
        .unionByName(price_scale_bad.select("order_id", "currency").withColumn("issue", F.lit("unit_price_scale")))
    write_sample(sample_union, fmt_sample_path, cfg["max_sample_rows"])
add_check("格式规范性检查", fmt_status, {"issues": fmt_cnt}, fmt_sample_path,
          note="验证货币码格式/集合与单价小数位不超过2位。")

# 8) 逻辑一致性检查
# - ship_time存在时应 >= pay_time
# - pay_time 不应晚于 当前时间 - t1_lag_days(按T+1)
# - is_refund逻辑已在值域检查校验金额符号
tz = cfg["timezone"]
today_utc = F.current_timestamp()
# 上界:当前时间减去T+1滞后
upper_bound_ts = F.expr(f"timestampadd(day, -{cfg['t1_lag_days']}, current_timestamp())")

logic_bad = df_t.filter(
    ((F.col("ship_time_t").isNotNull()) & (F.col("pay_time_t").isNotNull()) & (F.col("ship_time_t") < F.col("pay_time_t"))) |
    (F.col("pay_time_t") > upper_bound_ts)  # pay_time不应在T+1上界之后
)
logic_cnt = logic_bad.count()
logic_status = "PASS" if logic_cnt == 0 else "FAIL"
logic_sample_path = None
if logic_cnt > 0:
    logic_sample_path = f"{args.output_dir}/samples/logic_inconsistency"
    write_sample(logic_bad.select("order_id", "pay_time_t", "ship_time_t", "is_refund_t"), logic_sample_path, cfg["max_sample_rows"])
add_check("逻辑一致性检查", logic_status, {"invalid_rows": logic_cnt}, logic_sample_path,
          note="包含ship_time>=pay_time与T+1滞后上界检查。")

# 9) 异常值检查(统计,IQR)
# 对 quantity、unit_price_t、pay_amount_t 分别计算IQR边界并识别异常
stat_metrics = {}
stat_outlier_df = None
for col in ["quantity_t", "unit_price_t", "pay_amount_t"]:
    # 只在非空且非负的范围内计算(金额非负在非退款场景已保证;退款为负值会被视作异常,故仅对非退款子集做金额IQR以避免误报)
    base_df = df_t.filter(F.col(col).isNotNull())
    if col in ["unit_price_t"]:
        base_df = base_df.filter(F.col(col) >= 0)
    if col == "pay_amount_t":
        base_df = base_df.filter(F.col("is_refund_t") == F.lit(False))

    if base_df.count() == 0:
        continue
    bounds = iqr_bounds(base_df.withColumn(col, F.col(col).cast("double")), col)
    if not bounds:
        continue
    low, high = bounds
    outliers = df_t.filter(F.col(col).isNotNull() & ((F.col(col) < low) | (F.col(col) > high)))
    cnt = outliers.count()
    stat_metrics[col] = {"lower": low, "upper": high, "outliers": cnt}
    if cnt > 0:
        stat_outlier_df = outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")) if stat_outlier_df is None \
            else stat_outlier_df.unionByName(outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")))

stat_status = "PASS" if all(m["outliers"] == 0 for m in stat_metrics.values()) else "WARN"
stat_sample_path = None
if stat_outlier_df is not None:
    stat_sample_path = f"{args.output_dir}/samples/stat_outliers"
    write_sample(stat_outlier_df, stat_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于统计)", stat_status, stat_metrics, stat_sample_path,
          note="IQR方法;节假日销量峰值不在本检查范围(针对行级字段)。")

# 10) 异常值检查(基于业务规则)
# - pay_amount + discount <= quantity*unit_price + ε(允许抵扣导致更低;不允许超过)
# - 对退款单:允许 ship_time 有/无;不做更严金额等式约束
# 已在值域范围检查覆盖金额关系;此处补充负折扣、非整数数量、异常币种占比等业务风险提示
br_bad = df_t.filter(
    (F.col("quantity_t").cast("double") % 1 != 0) |  # 非整数数量
    (F.col("discount_t") < 0)
)
br_cnt = br_bad.count()
br_status = "PASS" if br_cnt == 0 else "FAIL"
br_sample_path = None
if br_cnt > 0:
    br_sample_path = f"{args.output_dir}/samples/business_rule_anomalies"
    write_sample(br_bad.select("order_id", "quantity_t", "discount_t"), br_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于业务规则)", br_status, {"invalid_rows": br_cnt}, br_sample_path,
          note="补充业务约束:数量需为整数、折扣不为负;金额关系已在值域范围检查中验证。")

# 11) 时间序列连续性检查(按pay_time的自然日)
date_col = to_date_in_tz(F.col("pay_time_t"), cfg["timezone"]).alias("pay_date")
dfd = df_t.select(date_col).filter(F.col("pay_date").isNotNull())
if dfd.count() > 0:
    min_date, max_date = dfd.agg(F.min("pay_date"), F.max("pay_date")).first()
    # 生成完整日期序列
    all_dates = spark.range(0, (max_date - min_date).days + 1) \
        .withColumn("pay_date", F.expr(f"date_add(to_date('{min_date}'), cast(id as int))")) \
        .select("pay_date")
    present = dfd.groupBy("pay_date").count().select("pay_date")
    missing = all_dates.join(present, on="pay_date", how="left_anti")
    missing_cnt = missing.count()
    ts_status = "PASS" if missing_cnt == 0 else "WARN"
    miss_sample_path = None
    if missing_cnt > 0:
        miss_sample_path = f"{args.output_dir}/samples/missing_dates"
        write_sample(missing, miss_sample_path, cfg["max_sample_rows"])
    add_check("时间序列连续性检查", ts_status,
              {"min_date": str(min_date), "max_date": str(max_date), "missing_days": missing_cnt},
              miss_sample_path,
              note="按pay_time日期连续性;T+1采集不影响日期序列。")
else:
    add_check("时间序列连续性检查", "FAIL", {"reason": "无有效pay_time记录"})

# 12) 输出JSON报告
report_path = f"{args.output_dir}/integrity_report.json"
with open("/tmp/integrity_report.json", "w", encoding="utf-8") as f:
    json.dump(report, f, ensure_ascii=False, indent=2)
# 保存报告到分布式存储
spark.createDataFrame([(json.dumps(report, ensure_ascii=False),)], ["json"]) \
    .coalesce(1).write.mode("overwrite").text(report_path)

# 13) 结束
print(json.dumps(report, ensure_ascii=False))
spark.stop()

if name == "main": sys.exit(main())

五、说明与可调项目

  • 可配置项:
    • valid_currencies:默认["CNY"];如多币种,传入 --currencies CNY,USD,HKD。
    • t1_lag_days:T+1入库约束,上限校验 pay_time <= 当前时间 - t1_lag_days。
    • timezone:用于按天连续性拆分(默认 Asia/Shanghai)。
  • 小数位规范:
    • unit_price 限制为2位小数(格式检查)。pay_amount与discount采用Decimal(18,4)运算,避免精度损失;如需强制2位,可增加相同检查。
  • 退款与分期:
    • 退款金额符号逻辑:is_refund=True => pay_amount<=0;否则 >=0。
    • 分期/零钱抵扣:不强制等式平衡,使用不超过原价总额的硬约束与统计异常检测。
  • 枚举字段:
    • status/channel/province未绑定具体枚举集,避免误伤;可接入维表或配置做集合校验。
  • 性能与资源:
    • 480万行适合单作业运行;IQR使用approx percentile,避免全量收敛。
    • 样本输出限制 max-sample-rows,默认1万行/检查,降存储压力。

六、后续扩展建议

  • 与维表对齐:
    • 省份合法值、渠道/状态枚举、SKU白名单等做维表JOIN并校验。
  • 金额对账增强:
    • 引入字段 wallet_deduction、coupon_amount、shipping_fee,并以等式:gross = quantity*unit_price;paid = gross - coupon - wallet - discount +/- rounding;对齐财务口径。
  • 异常阈值自适应:
    • 按币种/渠道分组计算IQR,减少结构性差异导致的误报。可用窗口函数或groupBy + percentile_approx。

该脚本覆盖所需的10类完整性校验,提供可审计的报告与样本输出,并在业务特性(节假日峰值、分期、零钱抵扣)下避免不合理误判。

以下为一份可复用的 PySpark 数据质量验证脚本,针对所述线上贷款申请数据集,覆盖所要求的10类校验项。脚本具备配置化、可扩展和批处理/流式可切换的能力,并输出结构化的校验结果与样本明细。

脚本文件名建议:dq_loan_app_validation.py

依赖:

  • Spark 3.x
  • Python 3.8+
  • 输入数据可为 parquet/csv/json(通过参数指定)

使用示例(批处理): spark-submit dq_loan_app_validation.py
--input /data/loan_apps/2024/
--input-format parquet
--output /data/dq_out/loan_apps/
--streaming false

代码: #!/usr/bin/env python3

-- coding: utf-8 --

""" 贷款申请数据质量验证脚本 覆盖校验项:

  1. 缺失值检查
  2. 重复值检查
  3. 异常值检查(基于统计:IQR/极端分位)
  4. 异常值检查(基于业务规则:可配置)
  5. 数据类型一致性检查与统一
  6. 主键唯一性检查
  7. 值域范围检查(枚举/数值)
  8. 格式规范性检查(IP/哈希/时间)
  9. 逻辑一致性检查(跨字段)
  10. 时间序列连续性检查(按天/小时覆盖与缺口)

说明:

  • 仅对存在于数据中的字段执行相应校验,缺失字段跳过并记录。
  • 业务阈值使用可配置规则。默认值保守,建议依据历史数据调优。
  • 结果以结构化明细与汇总指标方式输出,便于审计与复用。 """

import argparse import json import sys from datetime import datetime, timedelta

from pyspark.sql import SparkSession, DataFrame, Window from pyspark.sql import functions as F from pyspark.sql import types as T

-------------------------------

配置区(可按需调整/外部化为JSON/YAML)

-------------------------------

CONFIG = { "primary_key": "application_id", "timestamp_col": "app_datetime", "id_type_col": "id_type", "id_number_col": "id_number", # 已脱敏哈希 "core_numeric_cols": [ "age", "income_monthly", "debt_total", "loan_amount", "loan_term_months", "employment_length_years", "credit_score" ], "categorical_cols": [ "gender", "province", "marital_status", "education_level", "channel" ], "string_cols": ["device_id", "ip"], # 目标类型定义(统一类型) "schema_expectation": { "application_id": "string", "app_datetime": "timestamp", "id_type": "string", "id_number": "string", "age": "int", "gender": "string", "province": "string", "income_monthly": "double", "debt_total": "double", "loan_amount": "double", "loan_term_months": "int", "employment_length_years": "double", "marital_status": "string", "education_level": "string", "credit_score": "double", "device_id": "string", "ip": "string", "channel": "string" }, # 值域/枚举(如未知可留空,或仅做存在性检查) "enums": { "id_type": ["ID_CARD", "PASSPORT", "OTHER"], "gender": ["M", "F", "U", "UNKNOWN", "OTHER"], "marital_status": ["SINGLE", "MARRIED", "DIVORCED", "WIDOWED", "OTHER"], "education_level": ["HIGH_SCHOOL", "BACHELOR", "MASTER", "PHD", "OTHER"], "channel": [] # 若为空则不限制;否则精确匹配白名单 }, # 数值基本范围(保守默认,可根据历史数据调整;为None则跳过静态范围) "ranges": { "age": {"min": 18, "max": 75}, "loan_term_months": {"min": 1, "max": 120}, "employment_length_years": {"min": 0, "max": 60}, "income_monthly": {"min": 0, "max": None}, "debt_total": {"min": 0, "max": None}, "loan_amount": {"min": 0, "max": None}, "credit_score": {"min": 0, "max": None} }, # 业务规则(可配置开关与阈值) "business_rules": { # 时间不超过未来5分钟,且不早于今天-400天(约13个月) "app_datetime_future_minutes": 5, "app_datetime_past_days": 400, # DTI = debt_total/income_monthly:静态阈值与分位阈值取较宽松者 "max_dti_static": 5.0, # 若income=0则DTI设为null并记录异常 # 年龄与工作年限:age >= employment_length_years + 14(假设最早工作年龄14岁) "min_age_minus_employment_years": 14, # 重复申请:同(id_number, loan_amount, loan_term_months) 7天内重复视为重复 "repeat_within_days": 7 }, # 格式与规范 "format_checks": { # IPv4正则 "ipv4_regex": r"^(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d).){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$", # 哈希长度白名单(常见:MD5=32, SHA-1=40, SHA-256=64),未知时可放宽 "id_hash_lengths": [32, 40, 64], # device_id一般为字母数字与-,长度放宽 "device_id_regex": r"^[A-Za-z0-9-:.]{8,100}$", # 允许的省份列表若为空不校验 "province_whitelist": [] }, # 统计异常检测(IQR):乘数1.5为常规,3为极端 "iqr_multiplier": 1.5, "extreme_iqr_multiplier": 3.0, # 输出采样 "sample_rows": 100 }

-------------------------------

工具函数

-------------------------------

def spark_type_from_str(t: str): m = { "string": T.StringType(), "int": T.IntegerType(), "bigint": T.LongType(), "double": T.DoubleType(), "float": T.FloatType(), "boolean": T.BooleanType(), "timestamp": T.TimestampType(), "date": T.DateType() } return m.get(t, T.StringType())

def ensure_columns(df: DataFrame, cols): exists = [c for c in cols if c in df.columns] missing = [c for c in cols if c not in df.columns] return exists, missing

def cast_with_flag(df: DataFrame, col: str, to_type: str): target_type = spark_type_from_str(to_type) # 记录原始类型 cast_col = F.col(col).cast(target_type) ok_col = F.when(F.col(col).isNull(), F.lit(True))
.when(cast_col.isNull() & F.col(col).isNotNull(), F.lit(False))
.otherwise(F.lit(True)) df2 = df.withColumn(col, cast_col)
.withColumn(f"{col}__cast_ok", ok_col) return df2

def unify_types(df: DataFrame, schema_expectation: dict): df2 = df present = [c for c in schema_expectation.keys() if c in df.columns] for c in present: df2 = cast_with_flag(df2, c, schema_expectation[c]) return df2

def approx_iqr_bounds(df: DataFrame, col: str, multiplier: float): q = df.approxQuantile(col, [0.25, 0.75], 0.001) if not q or len(q) < 2: return None q1, q3 = q if q1 is None or q3 is None: return None iqr = q3 - q1 lower = q1 - multiplier * iqr upper = q3 + multiplier * iqr return lower, upper

def write_df(df: DataFrame, path: str): if df is None: return df.coalesce(1).write.mode("overwrite").parquet(path)

-------------------------------

校验逻辑

-------------------------------

def check_missingness(df: DataFrame, cfg: dict) -> DataFrame: cols = [c for c in df.columns if not c.endswith("__cast_ok")] exprs = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in cols] miss = df.agg(*exprs) miss_long = miss.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in miss.columns), [])])).alias("col", "missing_count")) miss_long = miss_long.withColumn("total_rows", F.lit(df.count()))
.withColumn("missing_rate", F.col("missing_count") / F.col("total_rows")) return miss_long

def check_primary_key_uniqueness(df: DataFrame, pk: str) -> DataFrame: if pk not in df.columns: return None dup = df.groupBy(pk).count().where(F.col("count") > 1) return dup

def check_exact_duplicates(df: DataFrame) -> DataFrame: # 完全相同行(全列相同) grp_cols = df.columns dup = df.groupBy(*grp_cols).count().where(F.col("count") > 1) return dup

def check_semantic_duplicates(df: DataFrame, cfg: dict) -> DataFrame: # 7天内重复申请:同(id_number, loan_amount, loan_term_months) needed = ["id_number", "loan_amount", "loan_term_months", "app_datetime"] exists, _ = ensure_columns(df, needed) if len(exists) < 4: return None w = Window.partitionBy("id_number", "loan_amount", "loan_term_months").orderBy(F.col("app_datetime").cast("timestamp")) prev_ts = F.lag("app_datetime").over(w) seconds_diff = F.when(prev_ts.isNotNull(), F.col("app_datetime").cast("long") - prev_ts.cast("long")).otherwise(None) within = df.withColumn("prev_app_datetime", prev_ts)
.withColumn("seconds_from_prev", seconds_diff)
.withColumn("repeat_within_days", (F.col("seconds_from_prev").isNotNull()) & (F.col("seconds_from_prev") <= F.lit(cfg["business_rules"]["repeat_within_days"] * 86400))) return within.where(F.col("repeat_within_days") == True)

def check_type_consistency_flags(df: DataFrame) -> DataFrame: cast_cols = [c for c in df.columns if c.endswith("__cast_ok")] if not cast_cols: return None bad = [] for c in cast_cols: src = c.replace("__cast_ok", "") bad.append(F.sum((~F.col(c)).cast("int")).alias(src)) out = df.agg(*bad) out_long = out.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in out.columns), [])])).alias("col", "failed_casts")) return out_long

def check_value_ranges(df: DataFrame, ranges: dict) -> DataFrame: violations = None for col, r in ranges.items(): if col not in df.columns: continue conds = [] if r.get("min") is not None: conds.append(F.col(col) < F.lit(r["min"])) if r.get("max") is not None: conds.append(F.col(col) > F.lit(r["max"])) if not conds: continue v = df.where(conds[0] if len(conds) == 1 else (conds[0] | conds[1]))
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_enums(df: DataFrame, enums: dict) -> DataFrame: violations = None for col, allowed in enums.items(): if col not in df.columns or not allowed: continue v = df.where(~F.col(col).isin(allowed) & F.col(col).isNotNull())
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_formats(df: DataFrame, cfg: dict) -> DataFrame: viols = None # IP if "ip" in df.columns: v1 = df.where(F.col("ip").isNotNull() & (~F.col("ip").rlike(cfg["format_checks"]["ipv4_regex"])))
.select(F.lit("ip").alias("col"), F.col("ip").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v1 if viols is None else viols.unionByName(v1) # id_number哈希长度 if CONFIG["id_number_col"] in df.columns: v2 = df.where(F.col(CONFIG["id_number_col"]).isNotNull() & (~F.length(F.col(CONFIG["id_number_col"])).isin(cfg["format_checks"]["id_hash_lengths"])))
.select(F.lit(CONFIG["id_number_col"]).alias("col"), F.col(CONFIG["id_number_col"]).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2 if viols is None else viols.unionByName(v2) # device_id if "device_id" in df.columns: v3 = df.where(F.col("device_id").isNotNull() & (~F.col("device_id").rlike(cfg["format_checks"]["device_id_regex"])))
.select(F.lit("device_id").alias("col"), F.col("device_id").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v3 if viols is None else viols.unionByName(v3) # 省份白名单 if "province" in df.columns and cfg["format_checks"]["province_whitelist"]: v4 = df.where(F.col("province").isNotNull() & (~F.col("province").isin(cfg["format_checks"]["province_whitelist"])))
.select(F.lit("province").alias("col"), F.col("province").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v4 if viols is None else viols.unionByName(v4) return viols

def check_business_rules(df: DataFrame, cfg: dict) -> DataFrame: br = cfg["business_rules"] viols = None now = F.current_timestamp() # 时间窗口:不在未来+5分钟,且不早于past_days if CONFIG["timestamp_col"] in df.columns: v1 = df.where( (F.col(CONFIG["timestamp_col"]) > F.expr(f"timestampadd(MINUTE, {br['app_datetime_future_minutes']}, current_timestamp())")) | (F.col(CONFIG["timestamp_col"]) < F.expr(f"timestampadd(DAY, -{br['app_datetime_past_days']}, current_timestamp())")) ).select(F.lit(CONFIG["timestamp_col"]).alias("rule"), F.col(CONFIG["timestamp_col"]).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v1 if viols is None else viols.unionByName(v1) # DTI if "debt_total" in df.columns and "income_monthly" in df.columns: dti = df.withColumn("DTI", F.when(F.col("income_monthly") > 0, F.col("debt_total") / F.col("income_monthly")) .otherwise(F.lit(None))) v2 = dti.where(F.col("DTI").isNotNull() & (F.col("DTI") > F.lit(br["max_dti_static"])))
.select(F.lit("DTI_gt_static").alias("rule"), F.col("DTI").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2 if viols is None else viols.unionByName(v2) # income=0但debt>0的异常 v2b = df.where((F.col("income_monthly") <= 0) & (F.col("debt_total") > 0))
.select(F.lit("income_zero_but_debt_positive").alias("rule"), F.struct("income_monthly", "debt_total").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v2b if viols is None else viols.unionByName(v2b) # 年龄与工作年限 if "age" in df.columns and "employment_length_years" in df.columns: v3 = df.where(F.col("age") < (F.col("employment_length_years") + F.lit(br["min_age_minus_employment_years"])))
.select(F.lit("age_vs_employment_years").alias("rule"), F.struct("age", "employment_length_years").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v3 if viols is None else viols.unionByName(v3) # 贷款金额必须 > 0 if "loan_amount" in df.columns: v4 = df.where(F.col("loan_amount") <= 0)
.select(F.lit("loan_amount_positive").alias("rule"), F.col("loan_amount").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) viols = v4 if viols is None else viols.unionByName(v4) return viols

def check_statistical_outliers(df: DataFrame, cfg: dict) -> DataFrame: violations = None for col in cfg["core_numeric_cols"]: if col not in df.columns: continue bounds = approx_iqr_bounds(df.where(F.col(col).isNotNull()), col, cfg["iqr_multiplier"]) if not bounds: continue lower, upper = bounds v = df.where((F.col(col) < F.lit(lower)) | (F.col(col) > F.lit(upper)))
.select(F.lit(col).alias("col"), F.col(col).alias("value"), F.lit(lower).alias("iqr_lower"), F.lit(upper).alias("iqr_upper"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns]) violations = v if violations is None else violations.unionByName(v) return violations

def check_time_continuity(df: DataFrame, cfg: dict) -> (DataFrame, DataFrame): # 连续性检查:按天/小时统计并找出缺口 ts = CONFIG["timestamp_col"] if ts not in df.columns: return None, None # 聚合 daily = df.groupBy(F.to_date(F.col(ts)).alias("date")).count() hourly = df.groupBy(F.date_format(F.col(ts), "yyyy-MM-dd HH:00:00").alias("hour")).count()

# 生成日期序列
min_max = df.agg(F.min(F.to_date(F.col(ts))).alias("min_d"),
                 F.max(F.to_date(F.col(ts))).alias("max_d")).collect()[0]
if min_max["min_d"] is None or min_max["max_d"] is None:
    return daily, hourly
min_d = min_max["min_d"]
max_d = min_max["max_d"]

spark = df.sql_ctx.sparkSession
dates = spark.range(0, F.datediff(F.lit(max_d), F.lit(min_d)) + 1)\
             .withColumn("date", F.expr(f"date_add(to_date('{min_d}'), cast(id as int))"))\
             .select("date")
daily_full = dates.join(daily, on="date", how="left").fillna({"count": 0})
missing_days = daily_full.where(F.col("count") == 0)

# 小时级序列(窗口按整点)
# 计算小时范围
min_max_h = df.agg(F.min(F.date_trunc("hour", F.col(ts))).alias("min_h"),
                   F.max(F.date_trunc("hour", F.col(ts))).alias("max_h")).collect()[0]
if min_max_h["min_h"] is None or min_max_h["max_h"] is None:
    return daily_full, hourly
min_h = min_max_h["min_h"]
max_h = min_max_h["max_h"]
hours = spark.range(0, (int((max_h.timestamp() - min_h.timestamp()) // 3600) + 1))\
             .withColumn("hour", F.expr(f"timestampadd(HOUR, cast(id as int), timestamp('{min_h}'))"))\
             .select("hour")
hourly_full = hours.join(hourly.withColumn("hour", F.col("hour").cast("timestamp")), on="hour", how="left").fillna({"count": 0})
missing_hours = hourly_full.where(F.col("count") == 0)

# 返回覆盖与缺口
return missing_days, missing_hours

-------------------------------

主流程

-------------------------------

def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True, help="输入路径(目录或文件)") parser.add_argument("--input-format", default="parquet", choices=["parquet", "csv", "json"], help="输入格式") parser.add_argument("--output", required=True, help="输出目录") parser.add_argument("--streaming", default="false", choices=["true", "false"], help="是否使用Structured Streaming") parser.add_argument("--csv-header", default="true", choices=["true", "false"], help="CSV是否包含表头") args = parser.parse_args()

spark = SparkSession.builder.appName("DQ_Loan_Applications").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

# 读入
if args.streaming == "true":
    # 流式模式(示例):以文件夹为源,micro-batch执行;注意:某些汇总(如approxQuantile)需使用complete/append触发器或批流分离
    if args.input_format == "parquet":
        df = spark.readStream.format("parquet").load(args.input)
    elif args.input_format == "csv":
        df = spark.readStream.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
    else:
        df = spark.readStream.json(args.input)
    # 简化:流式仅做记录级别规则检查与落地(完整统计建议离线批处理)
    print("注意:流式模式下统计型/IQR类校验建议离线批处理运行,本脚本流式仅演示结构,可在生产中分层实现。")
    sys.exit(0)
else:
    if args.input_format == "parquet":
        df = spark.read.parquet(args.input)
    elif args.input_format == "csv":
        df = spark.read.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
    else:
        df = spark.read.json(args.input)

# 统一类型并标记类型转换失败
df = unify_types(df, CONFIG["schema_expectation"])

# 1) 缺失值检查
missing_report = check_missingness(df, CONFIG)
write_df(missing_report, f"{args.output}/missing_report")

# 2) 重复值检查
pk_dups = check_primary_key_uniqueness(df, CONFIG["primary_key"])
write_df(pk_dups, f"{args.output}/primary_key_duplicates")

exact_dups = check_exact_duplicates(df)
write_df(exact_dups, f"{args.output}/exact_duplicates")

semantic_dups = check_semantic_duplicates(df, CONFIG)
write_df(semantic_dups, f"{args.output}/semantic_duplicates")

# 3) 统计异常(IQR)
stat_outliers = check_statistical_outliers(df, CONFIG)
write_df(stat_outliers, f"{args.output}/statistical_outliers")

# 4) 业务规则异常
biz_violations = check_business_rules(df, CONFIG)
write_df(biz_violations, f"{args.output}/business_rule_violations")

# 5) 类型一致性检查(转换失败计数)
type_cast_report = check_type_consistency_flags(df)
write_df(type_cast_report, f"{args.output}/type_cast_report")

# 6) 主键唯一性(已在2中输出详细重复明细),这里输出唯一性状态
if pk_dups is not None:
    pk_dup_count = pk_dups.count()
else:
    pk_dup_count = None

# 7) 值域范围检查(数值范围)
range_violations = check_value_ranges(df, CONFIG["ranges"])
write_df(range_violations, f"{args.output}/range_violations")

# 8) 格式规范性检查
format_violations = check_formats(df, CONFIG)
write_df(format_violations, f"{args.output}/format_violations")

# 9) 逻辑一致性检查(在业务规则中覆盖了跨字段主要逻辑)
# 若需额外逻辑可在check_business_rules中扩展

# 10) 时间序列连续性检查
missing_days, missing_hours = check_time_continuity(df, CONFIG)
write_df(missing_days, f"{args.output}/missing_days")
write_df(missing_hours, f"{args.output}/missing_hours")

# 构建汇总指标
total_rows = df.count()
summary_rows = []

def cnt(df_):
    return None if df_ is None else df_.count()

summary_rows.append(("total_rows", total_rows))
summary_rows.append(("missing_report_cols", cnt(missing_report)))
summary_rows.append(("primary_key_dup_count", cnt(pk_dups)))
summary_rows.append(("exact_duplicates_groups", cnt(exact_dups)))
summary_rows.append(("semantic_duplicates_rows", cnt(semantic_dups)))
summary_rows.append(("statistical_outliers_rows", cnt(stat_outliers)))
summary_rows.append(("business_rule_violations_rows", cnt(biz_violations)))
summary_rows.append(("type_cast_failed_cols", cnt(type_cast_report)))
summary_rows.append(("range_violations_rows", cnt(range_violations)))
summary_rows.append(("format_violations_rows", cnt(format_violations)))
summary_rows.append(("missing_days", cnt(missing_days)))
summary_rows.append(("missing_hours", cnt(missing_hours)))

summary_df = spark.createDataFrame(summary_rows, schema=["metric", "value"])
write_df(summary_df, f"{args.output}/summary_metrics")

# 导出各类异常样本(便于快速排查)
sample_targets = {
    "statistical_outliers": stat_outliers,
    "business_rule_violations": biz_violations,
    "range_violations": range_violations,
    "format_violations": format_violations,
    "semantic_duplicates": semantic_dups,
    "primary_key_duplicates": pk_dups
}
for name, d in sample_targets.items():
    if d is not None and d.count() > 0:
        write_df(d.limit(CONFIG["sample_rows"]), f"{args.output}/samples/{name}")

print("DQ校验完成。汇总指标:")
summary_df.show(truncate=False)

spark.stop()

if name == "main": main()

实施与扩展说明:

  • 配置外置:建议将 CONFIG 外置为JSON/YAML文件,通过命令行参数加载,支持不同渠道/批次的规则差异化。
  • 渠道完备度:可在缺失值检查基础上,追加按channel的完备度透视(groupBy('channel').agg(missing率)),用于定位渠道特异性问题。
  • 风险特征分布漂移:添加按月/渠道的分布统计并与基线分布比较,以识别异常输入突变(非本脚本必需,推荐在数据监控层实现)。
  • 流式适配:对Structured Streaming的场景,建议在ODS层清洗(类型统一/格式校验)并落地,再由离线批处理执行统计型/IQR/连续性等全量校验,保证结果稳定与可审计性。
  • 指标落库:summary_metrics与各类违规明细可写入Hive/Delta,定期生成日报/告警。

以下为面向海量分钟级工业传感器数据(约8亿行/90天)的可扩展数据完整性校验脚本。脚本使用 PySpark,覆盖以下验证内容:

  • 缺失值检查
  • 重复值检查
  • 异常值检查(基于统计,IQR)
  • 异常值检查(基于业务规则)
  • 数据类型一致性检查(含正则/枚举)
  • 主键唯一性检查(device_id+timestamp)
  • 值域范围检查
  • 格式规范性检查(ID/版本号)
  • 逻辑一致性检查(含固件升级前后量纲/分布一致)
  • 时间序列连续性检查(分钟级断点/跳跃/重传)

说明与假设:

  • 建议以分区增量方式运行(例如按 site/date 分区),避免一次性全表扫描。
  • 统计异常使用设备级分位数近似计算(percentile_approx),鲁棒且适配大规模数据。
  • 固件升级一致性检查为可选步骤,默认开启;窗口和阈值可配置。
  • 业务规则边界为默认值,务必结合现场确认后调整。

脚本(PySpark 3.x):

from pyspark.sql import SparkSession, functions as F, types as T, Window as W import re

-----------------------------

配置区(请按需修改)

-----------------------------

CONFIG = { "input": { "format": "delta", # 可选:delta/parquet/table "path_or_table": "db.sensor_delta" # 路径或表名 }, "output": { "base_path": "dbfs:/tmp/quality_checks", # 或 HDFS/S3 路径 "save_mode": "overwrite" # overwrite/append }, "spark": { "shuffle_partitions": 2000, "adaptive_enabled": "true" }, "schema": { "device_id": T.StringType(), "timestamp": T.TimestampType(), # UTC "temperature": T.DoubleType(), # ℃ "humidity": T.DoubleType(), # %RH "vibration_rms": T.DoubleType(), # g "status": T.StringType(), # 枚举 "firmware_version": T.StringType(), "line_id": T.StringType(), "site": T.StringType() }, "primary_key": ["device_id", "timestamp"], "sampling": { "interval_seconds": 60, "expect_minute_aligned": True, # 时间戳是否应对齐至整分钟(秒=0) "alignment_tolerance_seconds": 0 # 若允许抖动可设为 >0 }, "domain_rules": { "status_enum": ["RUNNING","IDLE","MAINTENANCE","OFF","ERROR"], "device_id_regex": r"^[A-Za-z0-9_-.]+$", "firmware_semver_regex": r"^\d+.\d+.\d+(?:-[A-Za-z0-9]+)?$", "temperature_range": [-40.0, 125.0], # ℃(示例) "humidity_range": [0.0, 100.0], # %RH(物理边界) "vibration_rms_range": [0.0, 50.0], # g(示例,上限请现场确认) "rate_limits_per_min": { # 每分钟变化率上限(示例) "temperature": 10.0, # ℃/min "humidity": 20.0, # %RH/min "vibration_rms": 5.0 # g/min } }, "stat_outlier": { "use_iqr": True, "iqr_k": 3.0, # IQR*3 之外视为异常 "group_level": "device_id", # 以设备为组做分位统计 "approx_pct": [0.25, 0.5, 0.75], "rel_error": 1e-3 }, "time_bounds": { # 时间合理性(可选) "min_days_ago": 120, # 不早于当前时间-120天 "max_hours_ahead": 1 # 不晚于当前时间+1小时 }, "firmware_consistency_check": { "enabled": True, "pre_window_minutes": 60, "post_window_minutes": 60, "status_for_analysis": ["RUNNING"], # 仅在运行工况下对比 "ratio_tolerance": 0.15, # 核心指标中位数前后比偏离阈值(15%) "metrics": ["temperature", "humidity", "vibration_rms"] } }

-----------------------------

会话与读取

-----------------------------

spark = SparkSession.builder.appName("SensorDataQualityChecks").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", str(CONFIG["spark"]["shuffle_partitions"])) spark.conf.set("spark.sql.adaptive.enabled", CONFIG["spark"]["adaptive_enabled"])

expected_schema = T.StructType([T.StructField(k, v, True) for k, v in CONFIG["schema"].items()])

input_cfg = CONFIG["input"] if input_cfg["format"] == "table": df_raw = spark.table(input_cfg["path_or_table"]) elif input_cfg["format"] in ("delta", "parquet"): df_raw = spark.read.format(input_cfg["format"]).schema(expected_schema).load(input_cfg["path_or_table"]) else: raise ValueError("Unsupported input format")

df = df_raw.select([F.col(c) for c in CONFIG["schema"].keys()])

-----------------------------

统一类型与基础派生

-----------------------------

强制类型转换(安全起见再 cast 一次)

for c, t in CONFIG["schema"].items(): df = df.withColumn(c, F.col(c).cast(t))

时间戳对齐检测字段

df = df.withColumn("unix_ts", F.col("timestamp").cast("long"))

-----------------------------

1) 缺失值检查

-----------------------------

missing_counts = df.select([F.sum(F.col(c).isNull().cast("bigint")).alias(c) for c in df.columns]) missing_counts.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/missing_counts")

-----------------------------

2) 数据类型一致性与格式规范性检查

-----------------------------

枚举/正则

status_ok = F.col("status").isin(CONFIG["domain_rules"]["status_enum"]) device_id_ok = F.col("device_id").rlike(CONFIG["domain_rules"]["device_id_regex"]) fw_ok = F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]) | F.col("firmware_version").isNull()

dtype_violations = df.where(~status_ok | ~device_id_ok | ~fw_ok) dtype_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/dtype_format_violations")

-----------------------------

3) 主键唯一性与重复值检查

-----------------------------

pk = CONFIG["primary_key"] dups = df.groupBy(*pk).count().where(F.col("count") > 1) dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_pk")

明细重复(全行重复可选):若需要,可对 Key+所有字段 hash 去重检测

row_hash = F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in df.columns]), 256) df_with_hash = df.withColumn("row_hash", row_hash) full_dups = df_with_hash.groupBy("row_hash").count().where(F.col("count") > 1) full_dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_rows")

-----------------------------

4) 值域范围检查(物理/业务边界)

-----------------------------

r = CONFIG["domain_rules"] range_violations = df.where( (F.col("temperature").isNotNull() & ((F.col("temperature") < F.lit(r["temperature_range"][0])) | (F.col("temperature") > F.lit(r["temperature_range"][1])))) | (F.col("humidity").isNotNull() & ((F.col("humidity") < F.lit(r["humidity_range"][0])) | (F.col("humidity") > F.lit(r["humidity_range"][1])))) | (F.col("vibration_rms").isNotNull() & ((F.col("vibration_rms") < F.lit(r["vibration_rms_range"][0])) | (F.col("vibration_rms") > F.lit(r["vibration_rms_range"][1])))) ) range_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/range_violations")

-----------------------------

5) 时间序列连续性检查(分钟级)

-----------------------------

w_dev_time = W.partitionBy("device_id").orderBy("timestamp") df_seq = df.select(*df.columns).withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("prev_unix", F.col("prev_ts").cast("long"))
.withColumn("delta_sec", F.col("unix_ts") - F.col("prev_unix"))

缺口与重复

interval = CONFIG["sampling"]["interval_seconds"] gaps = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") > interval))
.withColumn("missing_points", (F.col("delta_sec")/interval - F.lit(1)).cast("bigint")) gaps.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_gaps")

retransmissions = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") == 0)) retransmissions.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/retransmissions")

对齐性(整分秒=0)

if CONFIG["sampling"]["expect_minute_aligned"]: aligned = df.withColumn("sec_in_min", (F.col("unix_ts") % 60).cast("int")) misaligned = aligned.where(F.abs(F.col("sec_in_min")) > CONFIG["sampling"]["alignment_tolerance_seconds"]) misaligned.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/misaligned_timestamps")

时间合理性边界(可选)

now = F.current_timestamp() upper_bound = F.expr(f"timestampadd(HOUR, {CONFIG['time_bounds']['max_hours_ahead']}, current_timestamp())") lower_bound = F.expr(f"timestampadd(DAY, -{CONFIG['time_bounds']['min_days_ago']}, current_timestamp())") time_violations = df.where((F.col("timestamp") < lower_bound) | (F.col("timestamp") > upper_bound)) time_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_violations")

-----------------------------

6) 异常值检查(统计,基于 IQR,设备级)

-----------------------------

pct = CONFIG["stat_outlier"]["approx_pct"] rel_err = CONFIG["stat_outlier"]["rel_error"] group_col = CONFIG["stat_outlier"]["group_level"] iqr_k = CONFIG["stat_outlier"]["iqr_k"]

def iqr_bounds(df_in, col): stats = df_in.groupBy(group_col).agg( F.expr(f"percentile_approx({col}, array({pct[0]},{pct[1]},{pct[2]}), {int(1/rel_err)})").alias("p") ).select( F.col(group_col), F.col("p").getItem(0).alias("q1"), F.col("p").getItem(1).alias("q2"), F.col("p").getItem(2).alias("q3") ).withColumn("iqr", F.col("q3") - F.col("q1"))
.withColumn("lo", F.col("q1") - F.lit(iqr_k)*F.col("iqr"))
.withColumn("hi", F.col("q3") + F.lit(iqr_k)*F.col("iqr")) return stats

metrics = ["temperature","humidity","vibration_rms"] outlier_union = None

for m in metrics: stats_m = iqr_bounds(df.where(F.col(m).isNotNull()), m) flagged = df.join(stats_m, on=group_col, how="left")
.where(F.col(m).isNotNull() & ((F.col(m) < F.col("lo")) | (F.col(m) > F.col("hi"))))
.select("device_id","timestamp", F.lit(m).alias("metric"), F.col(m).alias("value"), "lo","hi","q1","q2","q3") outlier_union = flagged if outlier_union is None else outlier_union.unionByName(flagged)

if outlier_union is not None: outlier_union.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/stat_outliers")

-----------------------------

7) 异常值检查(基于业务规则:变化率/工况)

-----------------------------

rate = CONFIG["domain_rules"]["rate_limits_per_min"] df_rate = df.select("device_id","timestamp","temperature","humidity","vibration_rms")
.withColumn("prev_temperature", F.lag("temperature").over(w_dev_time))
.withColumn("prev_humidity", F.lag("humidity").over(w_dev_time))
.withColumn("prev_vibration_rms", F.lag("vibration_rms").over(w_dev_time))
.withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("delta_min", (F.col("unix_ts") - F.col("prev_ts").cast("long"))/60.0)

biz_violations = df_rate.where(F.col("prev_ts").isNotNull() & (F.col("delta_min") > 0)).select( "device_id","timestamp","delta_min", (F.abs(F.col("temperature")-F.col("prev_temperature"))/F.col("delta_min") > F.lit(rate["temperature"])).alias("temp_rate_flag"), (F.abs(F.col("humidity")-F.col("prev_humidity"))/F.col("delta_min") > F.lit(rate["humidity"])).alias("hum_rate_flag"), (F.abs(F.col("vibration_rms")-F.col("prev_vibration_rms"))/F.col("delta_min") > F.lit(rate["vibration_rms"])).alias("vib_rate_flag"), "temperature","prev_temperature","humidity","prev_humidity","vibration_rms","prev_vibration_rms" ).where("temp_rate_flag OR hum_rate_flag OR vib_rate_flag")

biz_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/biz_rule_outliers")

-----------------------------

8) 逻辑一致性检查:固件版本单调非降、及升级前后量纲/分布一致性

-----------------------------

def parse_semver_col(col): # 提取主次修订号;非匹配置为 NULL return F.when(F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]), F.struct( F.regexp_extract(col, r"^(\d+)", 1).cast("int").alias("maj"), F.regexp_extract(col, r"^\d+.(\d+)", 1).cast("int").alias("min"), F.regexp_extract(col, r"^\d+.\d+.(\d+)", 1).cast("int").alias("pat") )).otherwise(F.lit(None))

df_fw = df.withColumn("semver", parse_semver_col(F.col("firmware_version"))) w_fw = W.partitionBy("device_id").orderBy("timestamp") df_fw = df_fw.withColumn("prev_semver", F.lag("semver").over(w_fw))

版本回退(非空比较)

downgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") < F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") < F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") < F.col("prev_semver.pat"))) )) downgrades.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_downgrades")

升级事件定位

upgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") > F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") > F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") > F.col("prev_semver.pat"))) )).select("device_id","timestamp","firmware_version").withColumnRenamed("timestamp","upgrade_ts")

if CONFIG["firmware_consistency_check"]["enabled"]: pre_m = CONFIG["firmware_consistency_check"]["pre_window_minutes"] post_m = CONFIG["firmware_consistency_check"]["post_window_minutes"] run_status = CONFIG["firmware_consistency_check"]["status_for_analysis"] metrics_for_fw = CONFIG["firmware_consistency_check"]["metrics"] tol = CONFIG["firmware_consistency_check"]["ratio_tolerance"]

# 将升级事件与同设备数据进行窗口联接(限定前后窗口,减少放大联接)
df_run = df.where(F.col("status").isin(run_status)).select("device_id","timestamp", *metrics_for_fw)

# 预备:把升级事件展开为左右窗口范围
upgrades_range = upgrades.withColumn("pre_start", F.expr(f"timestampadd(MINUTE, -{pre_m}, upgrade_ts)")) \
                         .withColumn("post_end", F.expr(f"timestampadd(MINUTE, {post_m}, upgrade_ts)"))
# 左右窗口聚合中位数
pre_join = df_run.alias("d").join(upgrades_range.alias("u"),
           (F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>=F.col("u.pre_start")) & (F.col("d.timestamp")<F.col("u.upgrade_ts")),
           "inner")
post_join = df_run.alias("d").join(upgrades_range.alias("u"),
           (F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>F.col("u.upgrade_ts")) & (F.col("d.timestamp")<=F.col("u.post_end")),
           "inner")

pre_aggs = pre_join.groupBy("u.device_id","u.upgrade_ts").agg(
    *[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"pre_{m}_p50") for m in metrics_for_fw]
)
post_aggs = post_join.groupBy("u.device_id","u.upgrade_ts").agg(
    *[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"post_{m}_p50") for m in metrics_for_fw]
)
fw_cmp = pre_aggs.join(post_aggs, ["device_id","upgrade_ts"], "inner")

# 计算比值偏离
for m in metrics_for_fw:
    fw_cmp = fw_cmp.withColumn(f"{m}_ratio", F.col(f"post_{m}_p50")/F.col(f"pre_{m}_p50"))

# 标记超阈事件
conds = [ (F.col(f"{m}_ratio") > (1+tol)) | (F.col(f"{m}_ratio") < (1-tol)) for m in metrics_for_fw ]
fw_inconsistent = fw_cmp.where(F.reduce(lambda a, b: a | b, conds))
fw_inconsistent.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_metric_inconsistency")

-----------------------------

9) 逻辑一致性(可扩展示例)

示例:当 status=OFF/MAINTENANCE 时 vibration_rms 应接近 0(若适用,需现场确认)

-----------------------------

logic_examples = df.where( (F.col("status").isin(["OFF","MAINTENANCE"])) & (F.col("vibration_rms").isNotNull()) & (F.col("vibration_rms") > 0.2) # 示例阈值,按现场调整 ) logic_examples.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/logic_inconsistency_examples")

-----------------------------

10) 汇总指标输出

-----------------------------

summary = {}

记录总量

summary["total_rows"] = df.count() summary_df = spark.createDataFrame([(k, v) for k, v in summary.items()], schema="metric string, value string") summary_df.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/summary")

关键违规计数

def save_count(df_in, name): cnt = df_in.count() spark.createDataFrame([(name, cnt)], "metric string, value long")
.write.mode("append").format("delta").save(f"{CONFIG['output']['base_path']}/summary")

save_count(dups, "duplicate_pk") save_count(full_dups, "duplicate_rows") save_count(range_violations, "range_violations") save_count(gaps, "time_gaps_events") save_count(retransmissions, "retransmissions") if CONFIG["sampling"]["expect_minute_aligned"]: save_count(misaligned, "misaligned_timestamps") if outlier_union is not None: save_count(outlier_union, "stat_outliers") save_count(biz_violations, "biz_rule_outliers") save_count(dtype_violations, "dtype_format_violations") save_count(time_violations, "time_violations") save_count(downgrades, "firmware_downgrades") if CONFIG["firmware_consistency_check"]["enabled"]: save_count(fw_inconsistent, "firmware_metric_inconsistency") save_count(logic_examples, "logic_inconsistency_examples")

print("Data quality checks completed. Results saved to:", CONFIG["output"]["base_path"])

使用建议与说明:

  • 性能与资源:
    • 强烈建议按分区(site/date 或 device_id hash)分批执行,减少shuffle与join放大。
    • percentile_approx 在设备级聚合下对上千设备规模更稳健;若设备数极大,可改为站点/产线级统计。
  • 统计异常策略:
    • IQR3 较稳健于尖峰;如希望更敏感,可降为 IQR1.5;或改用 median+MAD(需二次聚合)。
  • 业务规则:
    • 温度、湿度、振动上限为示例,请依据设备规格与现场极限修订。
    • 变化率阈值需结合传感器响应与物理极限校准,避免过报。
  • 固件一致性:
    • 使用升级前后各60分钟窗口的中位数比值评估量纲/标定漂移。若现场负载在升级窗口内显著变化,可增加过滤条件(如稳定工况标识)或延长窗口。
  • 时间连续性:
    • 若网关存在对齐抖动,可放宽 alignment_tolerance_seconds 或基于四舍五入到分钟的逻辑重采样进行评估。
  • 输出:
    • 各子检查均输出明细 Delta 数据集,summary 输出核心计数指标,便于下游可视化与告警集成。

示例详情

该提示词已被收录:
“AI工程师必备:高效建模与数据处理提示词合集”
覆盖建模到评估关键环节,助你快速构建高性能模型
√ 立即可用 · 零学习成本
√ 参数化批量生成
√ 专业提示词工程师打磨

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

一键生成数据完整性校验脚本,按数据集特征自动定制规则与阈值
自动识别缺失、重复与异常值,给出修复建议与示例代码,降低清洗成本
按业务字段定义必填、唯一与取值范围,快速定位问题列与具体问题记录
可设置容忍度与告警级别,生成可读报告,让数据质量与团队标准对齐
适配多种数据表结构与场景,灵活扩展校验项,省去繁琐手写脚本
结合上下文解释校验结果,指出业务风险影响与下一步处理路径
模板化参数输入,团队可复用,缩短从需求到上线的验证与交付时间
无缝嵌入日常分析流程,运行即输出检查结论与可执行修复清单
输出结构化、清晰的说明与报告,便于审计留痕与跨团队沟通协作

🎯 解决的问题

把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...