热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
针对不同数据集特征和校验需求,能够快速生成可执行的Python脚本,用于系统化验证数据完整性。脚本涵盖缺失值、重复值和异常值等核心检查项,确保数据质量评估标准化、结果可复现,为数据清洗、分析和建模提供可靠支持。
以下提供一个可复用的PySpark数据完整性校验脚本,覆盖指定的10类检查。脚本在大数据量(480万行、45列)下可扩展运行,支持CSV/Parquet输入,采用统一口径、模块化实现,并输出JSON报告与问题样本。可按需调整业务规则与枚举配置。
一、适用场景与假设
二、主要校验内容映射
三、运行方式
#!/usr/bin/env python3
import argparse import json import sys from datetime import datetime, date, timedelta
from pyspark.sql import SparkSession, DataFrame from pyspark.sql import functions as F from pyspark.sql import types as T
def parse_args(): p = argparse.ArgumentParser(description="Order dataset integrity checks") p.add_argument("--input-path", required=True, help="Input path (CSV/Parquet)") p.add_argument("--input-format", required=True, choices=["csv", "parquet"], help="Input format") p.add_argument("--output-dir", required=True, help="Output directory for reports and samples") p.add_argument("--max-sample-rows", type=int, default=10000, help="Max sample rows per failed check") p.add_argument("--currencies", type=str, default="CNY", help="Comma-separated valid currency codes (3-letter). If unknown, format check only.") p.add_argument("--t1-lag-days", type=int, default=1, help="T+1 ingestion lag days (used to bound pay_time upper limit)") p.add_argument("--timezone", type=str, default="Asia/Shanghai", help="Timezone for date boundaries") return p.parse_args()
def build_spark(): return (SparkSession.builder .appName("order_integrity_checks") .config("spark.sql.session.timeZone", "UTC") # do arithmetic in UTC; adjust display if needed .getOrCreate())
def get_schema(): # Use permissive schema for robustness; cast later and track failures. # unit_price: 2 decimal places as business rule; we use Decimal(18, 4) interim to catch >2 decimals in format check. return T.StructType([ T.StructField("order_id", T.StringType(), True), T.StructField("user_id", T.StringType(), True), T.StructField("sku_id", T.StringType(), True), T.StructField("item_name", T.StringType(), True), T.StructField("quantity", T.StringType(), True), # read as string first T.StructField("unit_price", T.StringType(), True), T.StructField("currency", T.StringType(), True), T.StructField("discount", T.StringType(), True), T.StructField("pay_amount", T.StringType(), True), T.StructField("pay_time", T.StringType(), True), T.StructField("ship_time", T.StringType(), True), T.StructField("province", T.StringType(), True), T.StructField("status", T.StringType(), True), T.StructField("channel", T.StringType(), True), T.StructField("is_refund", T.StringType(), True), # ... other ~30 columns kept as-is ])
def cast_columns(df: DataFrame) -> (DataFrame, DataFrame):
# Keep raw columns for format checks, create typed columns for logic checks.
# Track cast failures per column.
# Decimal types: store as Decimal(18, 4) to safely perform rounding checks; business rule will limit unit_price to 2 decimals.
typed = df.withColumn("quantity_t", F.col("quantity").cast("long"))
.withColumn("unit_price_t", F.col("unit_price").cast(T.DecimalType(18, 4)))
.withColumn("discount_t", F.col("discount").cast(T.DecimalType(18, 4)))
.withColumn("pay_amount_t", F.col("pay_amount").cast(T.DecimalType(18, 4)))
.withColumn("pay_time_t", F.to_timestamp("pay_time"))
.withColumn("ship_time_t", F.to_timestamp("ship_time"))
.withColumn("is_refund_t", F.when(F.lower(F.col("is_refund")).isin("true", "1", "t", "yes"), F.lit(True))
.when(F.lower(F.col("is_refund")).isin("false", "0", "f", "no"), F.lit(False))
.otherwise(None))
# Cast error flags: original non-null but typed null
cast_errors = []
for orig, typed_col in [
("quantity", "quantity_t"),
("unit_price", "unit_price_t"),
("discount", "discount_t"),
("pay_amount", "pay_amount_t"),
("pay_time", "pay_time_t"),
("ship_time", "ship_time_t"),
("is_refund", "is_refund_t")
]:
cast_errors.append(
F.when((F.col(orig).isNotNull()) & (F.col(typed_col).isNull()), F.lit(orig))
)
cast_err_col = F.array_remove(F.array(*cast_errors), F.lit(None)).alias("type_error_cols")
typed = typed.withColumn("type_error_cols", cast_err_col)
return typed, typed.filter(F.size(F.col("type_error_cols")) > 0)
def write_sample(df: DataFrame, path: str, n: int): if df is None: return 0 count = df.limit(n).count() if count > 0: df.limit(n).coalesce(1).write.mode("overwrite").option("header", True).csv(path) return count
def to_date_in_tz(col_ts, tz): # Convert timestamp to date in specified timezone return F.to_date(F.from_utc_timestamp(col_ts, tz))
def iqr_bounds(df: DataFrame, col: str): # Use approx percentiles q = df.selectExpr(f"percentile_approx({col}, array(0.25, 0.75), 1000) as q").collect()[0]["q"] if q is None or len(q) < 2: return None q1, q3 = float(q[0]), float(q[1]) iqr = q3 - q1 low = q1 - 1.5 * iqr high = q3 + 1.5 * iqr return low, high
def main(): args = parse_args() spark = build_spark() spark.sparkContext.setLogLevel("WARN")
cfg = {
"valid_currencies": [c.strip() for c in args.currencies.split(",") if c.strip()],
"timezone": args.timezone,
"t1_lag_days": args.t1_lag_days,
"max_sample_rows": args.max_sample_rows
}
# 1) Load
if args.input_format == "csv":
df = (spark.read
.option("header", True)
.option("multiLine", True)
.option("escape", "\"")
.schema(get_schema())
.csv(args.input_path))
else:
df = spark.read.parquet(args.input_path)
df = df.persist()
report = {"generated_at": datetime.utcnow().isoformat() + "Z",
"checks": []}
# 2) Type casting and type errors
df_t, type_err_rows = cast_columns(df)
df_t = df_t.persist()
# Helper for adding check result
def add_check(name, status, metrics=None, sample_rel_path=None, note=None):
entry = {"name": name, "status": status}
if metrics is not None: entry["metrics"] = metrics
if sample_rel_path is not None: entry["sample"] = sample_rel_path
if note is not None: entry["note"] = note
report["checks"].append(entry)
# 3) 缺失值检查
required_cols = ["order_id", "user_id", "sku_id", "quantity_t",
"unit_price_t", "currency", "pay_amount_t", "pay_time_t",
"status", "channel", "is_refund_t"]
miss_metrics = {}
for c in required_cols:
col0 = c if c in df_t.columns else c # actual typed col
miss_metrics[c] = df_t.filter(F.col(col0).isNull()).count()
missing_total = sum(miss_metrics.values())
status = "PASS" if missing_total == 0 else "FAIL"
sample_path = None
if missing_total > 0:
sample_df = None
cond = None
for c in required_cols:
cond = F.col(c).isNull() if cond is None else (cond | F.col(c).isNull())
sample_df = df_t.filter(cond).select("order_id", *required_cols)
sample_path = f"{args.output_dir}/samples/missing_required"
write_sample(sample_df, sample_path, cfg["max_sample_rows"])
add_check("缺失值检查", status, miss_metrics, sample_path)
# 4) 数据类型一致性检查
type_err_cnt = type_err_rows.count()
type_status = "PASS" if type_err_cnt == 0 else "FAIL"
type_sample_path = None
if type_err_cnt > 0:
type_sample_path = f"{args.output_dir}/samples/type_inconsistency"
write_sample(type_err_rows.select("order_id", "type_error_cols", "quantity", "unit_price", "discount", "pay_amount", "pay_time", "ship_time", "is_refund"),
type_sample_path, cfg["max_sample_rows"])
add_check("数据类型一致性检查", type_status, {"type_error_rows": type_err_cnt}, type_sample_path)
# 5) 主键唯一性检查 + 重复值检查
dup_df = df_t.groupBy("order_id").count().filter(F.col("count") > 1)
dup_cnt = dup_df.count()
dup_status = "PASS" if dup_cnt == 0 else "FAIL"
dup_sample_path = None
if dup_cnt > 0:
dup_ids = dup_df.select("order_id").limit(cfg["max_sample_rows"])
dup_rows = df_t.join(dup_ids, on="order_id", how="inner")
dup_sample_path = f"{args.output_dir}/samples/duplicate_order_id"
write_sample(dup_rows, dup_sample_path, cfg["max_sample_rows"])
add_check("主键唯一性检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)
add_check("重复值检查", dup_status, {"duplicate_order_ids": dup_cnt}, dup_sample_path)
# 6) 值域范围检查(硬约束)
# - quantity >= 1
# - unit_price_t >= 0
# - discount_t >= 0
# - is_refund_t=False => pay_amount_t >= 0
# - is_refund_t=True => pay_amount_t <= 0 (允许0代表全额撤销后净额为0)
# - pay_amount 不应超过原价总额(弱约束,FAIL;如有零钱抵扣只会降低,不会升高)
gross_amount = (F.col("quantity_t").cast(T.DecimalType(18, 4)) * F.col("unit_price_t"))
vr_bad = df_t.filter(
(F.col("quantity_t") < 1) |
(F.col("unit_price_t") < 0) |
(F.col("discount_t") < 0) |
((F.col("is_refund_t") == F.lit(False)) & (F.col("pay_amount_t") < 0)) |
((F.col("is_refund_t") == F.lit(True)) & (F.col("pay_amount_t") > 0)) |
(F.col("pay_amount_t") > gross_amount + F.lit(0.0001)) | # allow minimal epsilon
(F.col("discount_t") > gross_amount + F.lit(0.0001))
)
vr_cnt = vr_bad.count()
vr_status = "PASS" if vr_cnt == 0 else "FAIL"
vr_sample_path = None
if vr_cnt > 0:
vr_sample_path = f"{args.output_dir}/samples/value_range"
write_sample(vr_bad.select("order_id", "quantity_t", "unit_price_t", "discount_t", "pay_amount_t", "is_refund_t"), vr_sample_path, cfg["max_sample_rows"])
add_check("值域范围检查", vr_status, {"invalid_rows": vr_cnt}, vr_sample_path,
note="包含数量/金额非负与金额不超过原价总额的约束;考虑零钱抵扣不做等式约束。")
# 7) 格式规范性检查
# - currency: 3-letter uppercase;若提供valid_currencies,则校验集合。
# - unit_price应为2位小数以内:检查 round(unit_price, 2) == unit_price
fmt_bad_currency = None
if cfg["valid_currencies"]:
fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$") | (~F.col("currency").isin(cfg["valid_currencies"])))
else:
fmt_bad_currency = df_t.filter(~F.col("currency").rlike("^[A-Z]{3}$"))
price_scale_bad = df_t.filter(F.col("unit_price_t").isNotNull() & (F.col("unit_price_t") != F.round(F.col("unit_price_t"), 2)))
fmt_cnt = fmt_bad_currency.count() + price_scale_bad.count()
fmt_status = "PASS" if fmt_cnt == 0 else "FAIL"
fmt_sample_path = None
if fmt_cnt > 0:
fmt_sample_path = f"{args.output_dir}/samples/format_issues"
sample_union = fmt_bad_currency.select("order_id", "currency").withColumn("issue", F.lit("currency")) \
.unionByName(price_scale_bad.select("order_id", "currency").withColumn("issue", F.lit("unit_price_scale")))
write_sample(sample_union, fmt_sample_path, cfg["max_sample_rows"])
add_check("格式规范性检查", fmt_status, {"issues": fmt_cnt}, fmt_sample_path,
note="验证货币码格式/集合与单价小数位不超过2位。")
# 8) 逻辑一致性检查
# - ship_time存在时应 >= pay_time
# - pay_time 不应晚于 当前时间 - t1_lag_days(按T+1)
# - is_refund逻辑已在值域检查校验金额符号
tz = cfg["timezone"]
today_utc = F.current_timestamp()
# 上界:当前时间减去T+1滞后
upper_bound_ts = F.expr(f"timestampadd(day, -{cfg['t1_lag_days']}, current_timestamp())")
logic_bad = df_t.filter(
((F.col("ship_time_t").isNotNull()) & (F.col("pay_time_t").isNotNull()) & (F.col("ship_time_t") < F.col("pay_time_t"))) |
(F.col("pay_time_t") > upper_bound_ts) # pay_time不应在T+1上界之后
)
logic_cnt = logic_bad.count()
logic_status = "PASS" if logic_cnt == 0 else "FAIL"
logic_sample_path = None
if logic_cnt > 0:
logic_sample_path = f"{args.output_dir}/samples/logic_inconsistency"
write_sample(logic_bad.select("order_id", "pay_time_t", "ship_time_t", "is_refund_t"), logic_sample_path, cfg["max_sample_rows"])
add_check("逻辑一致性检查", logic_status, {"invalid_rows": logic_cnt}, logic_sample_path,
note="包含ship_time>=pay_time与T+1滞后上界检查。")
# 9) 异常值检查(统计,IQR)
# 对 quantity、unit_price_t、pay_amount_t 分别计算IQR边界并识别异常
stat_metrics = {}
stat_outlier_df = None
for col in ["quantity_t", "unit_price_t", "pay_amount_t"]:
# 只在非空且非负的范围内计算(金额非负在非退款场景已保证;退款为负值会被视作异常,故仅对非退款子集做金额IQR以避免误报)
base_df = df_t.filter(F.col(col).isNotNull())
if col in ["unit_price_t"]:
base_df = base_df.filter(F.col(col) >= 0)
if col == "pay_amount_t":
base_df = base_df.filter(F.col("is_refund_t") == F.lit(False))
if base_df.count() == 0:
continue
bounds = iqr_bounds(base_df.withColumn(col, F.col(col).cast("double")), col)
if not bounds:
continue
low, high = bounds
outliers = df_t.filter(F.col(col).isNotNull() & ((F.col(col) < low) | (F.col(col) > high)))
cnt = outliers.count()
stat_metrics[col] = {"lower": low, "upper": high, "outliers": cnt}
if cnt > 0:
stat_outlier_df = outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")) if stat_outlier_df is None \
else stat_outlier_df.unionByName(outliers.select("order_id", F.lit(col).alias("metric"), F.col(col).alias("value")))
stat_status = "PASS" if all(m["outliers"] == 0 for m in stat_metrics.values()) else "WARN"
stat_sample_path = None
if stat_outlier_df is not None:
stat_sample_path = f"{args.output_dir}/samples/stat_outliers"
write_sample(stat_outlier_df, stat_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于统计)", stat_status, stat_metrics, stat_sample_path,
note="IQR方法;节假日销量峰值不在本检查范围(针对行级字段)。")
# 10) 异常值检查(基于业务规则)
# - pay_amount + discount <= quantity*unit_price + ε(允许抵扣导致更低;不允许超过)
# - 对退款单:允许 ship_time 有/无;不做更严金额等式约束
# 已在值域范围检查覆盖金额关系;此处补充负折扣、非整数数量、异常币种占比等业务风险提示
br_bad = df_t.filter(
(F.col("quantity_t").cast("double") % 1 != 0) | # 非整数数量
(F.col("discount_t") < 0)
)
br_cnt = br_bad.count()
br_status = "PASS" if br_cnt == 0 else "FAIL"
br_sample_path = None
if br_cnt > 0:
br_sample_path = f"{args.output_dir}/samples/business_rule_anomalies"
write_sample(br_bad.select("order_id", "quantity_t", "discount_t"), br_sample_path, cfg["max_sample_rows"])
add_check("异常值检查(基于业务规则)", br_status, {"invalid_rows": br_cnt}, br_sample_path,
note="补充业务约束:数量需为整数、折扣不为负;金额关系已在值域范围检查中验证。")
# 11) 时间序列连续性检查(按pay_time的自然日)
date_col = to_date_in_tz(F.col("pay_time_t"), cfg["timezone"]).alias("pay_date")
dfd = df_t.select(date_col).filter(F.col("pay_date").isNotNull())
if dfd.count() > 0:
min_date, max_date = dfd.agg(F.min("pay_date"), F.max("pay_date")).first()
# 生成完整日期序列
all_dates = spark.range(0, (max_date - min_date).days + 1) \
.withColumn("pay_date", F.expr(f"date_add(to_date('{min_date}'), cast(id as int))")) \
.select("pay_date")
present = dfd.groupBy("pay_date").count().select("pay_date")
missing = all_dates.join(present, on="pay_date", how="left_anti")
missing_cnt = missing.count()
ts_status = "PASS" if missing_cnt == 0 else "WARN"
miss_sample_path = None
if missing_cnt > 0:
miss_sample_path = f"{args.output_dir}/samples/missing_dates"
write_sample(missing, miss_sample_path, cfg["max_sample_rows"])
add_check("时间序列连续性检查", ts_status,
{"min_date": str(min_date), "max_date": str(max_date), "missing_days": missing_cnt},
miss_sample_path,
note="按pay_time日期连续性;T+1采集不影响日期序列。")
else:
add_check("时间序列连续性检查", "FAIL", {"reason": "无有效pay_time记录"})
# 12) 输出JSON报告
report_path = f"{args.output_dir}/integrity_report.json"
with open("/tmp/integrity_report.json", "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 保存报告到分布式存储
spark.createDataFrame([(json.dumps(report, ensure_ascii=False),)], ["json"]) \
.coalesce(1).write.mode("overwrite").text(report_path)
# 13) 结束
print(json.dumps(report, ensure_ascii=False))
spark.stop()
五、说明与可调项目
六、后续扩展建议
该脚本覆盖所需的10类完整性校验,提供可审计的报告与样本输出,并在业务特性(节假日峰值、分期、零钱抵扣)下避免不合理误判。
以下为一份可复用的 PySpark 数据质量验证脚本,针对所述线上贷款申请数据集,覆盖所要求的10类校验项。脚本具备配置化、可扩展和批处理/流式可切换的能力,并输出结构化的校验结果与样本明细。
脚本文件名建议:dq_loan_app_validation.py
依赖:
使用示例(批处理):
spark-submit dq_loan_app_validation.py
--input /data/loan_apps/2024/
--input-format parquet
--output /data/dq_out/loan_apps/
--streaming false
代码: #!/usr/bin/env python3
""" 贷款申请数据质量验证脚本 覆盖校验项:
说明:
import argparse import json import sys from datetime import datetime, timedelta
from pyspark.sql import SparkSession, DataFrame, Window from pyspark.sql import functions as F from pyspark.sql import types as T
CONFIG = { "primary_key": "application_id", "timestamp_col": "app_datetime", "id_type_col": "id_type", "id_number_col": "id_number", # 已脱敏哈希 "core_numeric_cols": [ "age", "income_monthly", "debt_total", "loan_amount", "loan_term_months", "employment_length_years", "credit_score" ], "categorical_cols": [ "gender", "province", "marital_status", "education_level", "channel" ], "string_cols": ["device_id", "ip"], # 目标类型定义(统一类型) "schema_expectation": { "application_id": "string", "app_datetime": "timestamp", "id_type": "string", "id_number": "string", "age": "int", "gender": "string", "province": "string", "income_monthly": "double", "debt_total": "double", "loan_amount": "double", "loan_term_months": "int", "employment_length_years": "double", "marital_status": "string", "education_level": "string", "credit_score": "double", "device_id": "string", "ip": "string", "channel": "string" }, # 值域/枚举(如未知可留空,或仅做存在性检查) "enums": { "id_type": ["ID_CARD", "PASSPORT", "OTHER"], "gender": ["M", "F", "U", "UNKNOWN", "OTHER"], "marital_status": ["SINGLE", "MARRIED", "DIVORCED", "WIDOWED", "OTHER"], "education_level": ["HIGH_SCHOOL", "BACHELOR", "MASTER", "PHD", "OTHER"], "channel": [] # 若为空则不限制;否则精确匹配白名单 }, # 数值基本范围(保守默认,可根据历史数据调整;为None则跳过静态范围) "ranges": { "age": {"min": 18, "max": 75}, "loan_term_months": {"min": 1, "max": 120}, "employment_length_years": {"min": 0, "max": 60}, "income_monthly": {"min": 0, "max": None}, "debt_total": {"min": 0, "max": None}, "loan_amount": {"min": 0, "max": None}, "credit_score": {"min": 0, "max": None} }, # 业务规则(可配置开关与阈值) "business_rules": { # 时间不超过未来5分钟,且不早于今天-400天(约13个月) "app_datetime_future_minutes": 5, "app_datetime_past_days": 400, # DTI = debt_total/income_monthly:静态阈值与分位阈值取较宽松者 "max_dti_static": 5.0, # 若income=0则DTI设为null并记录异常 # 年龄与工作年限:age >= employment_length_years + 14(假设最早工作年龄14岁) "min_age_minus_employment_years": 14, # 重复申请:同(id_number, loan_amount, loan_term_months) 7天内重复视为重复 "repeat_within_days": 7 }, # 格式与规范 "format_checks": { # IPv4正则 "ipv4_regex": r"^(?:(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d).){3}(?:25[0-5]|2[0-4]\d|1\d{2}|[1-9]?\d)$", # 哈希长度白名单(常见:MD5=32, SHA-1=40, SHA-256=64),未知时可放宽 "id_hash_lengths": [32, 40, 64], # device_id一般为字母数字与-,长度放宽 "device_id_regex": r"^[A-Za-z0-9-:.]{8,100}$", # 允许的省份列表若为空不校验 "province_whitelist": [] }, # 统计异常检测(IQR):乘数1.5为常规,3为极端 "iqr_multiplier": 1.5, "extreme_iqr_multiplier": 3.0, # 输出采样 "sample_rows": 100 }
def spark_type_from_str(t: str): m = { "string": T.StringType(), "int": T.IntegerType(), "bigint": T.LongType(), "double": T.DoubleType(), "float": T.FloatType(), "boolean": T.BooleanType(), "timestamp": T.TimestampType(), "date": T.DateType() } return m.get(t, T.StringType())
def ensure_columns(df: DataFrame, cols): exists = [c for c in cols if c in df.columns] missing = [c for c in cols if c not in df.columns] return exists, missing
def cast_with_flag(df: DataFrame, col: str, to_type: str):
target_type = spark_type_from_str(to_type)
# 记录原始类型
cast_col = F.col(col).cast(target_type)
ok_col = F.when(F.col(col).isNull(), F.lit(True))
.when(cast_col.isNull() & F.col(col).isNotNull(), F.lit(False))
.otherwise(F.lit(True))
df2 = df.withColumn(col, cast_col)
.withColumn(f"{col}__cast_ok", ok_col)
return df2
def unify_types(df: DataFrame, schema_expectation: dict): df2 = df present = [c for c in schema_expectation.keys() if c in df.columns] for c in present: df2 = cast_with_flag(df2, c, schema_expectation[c]) return df2
def approx_iqr_bounds(df: DataFrame, col: str, multiplier: float): q = df.approxQuantile(col, [0.25, 0.75], 0.001) if not q or len(q) < 2: return None q1, q3 = q if q1 is None or q3 is None: return None iqr = q3 - q1 lower = q1 - multiplier * iqr upper = q3 + multiplier * iqr return lower, upper
def write_df(df: DataFrame, path: str): if df is None: return df.coalesce(1).write.mode("overwrite").parquet(path)
def check_missingness(df: DataFrame, cfg: dict) -> DataFrame:
cols = [c for c in df.columns if not c.endswith("__cast_ok")]
exprs = [F.sum(F.col(c).isNull().cast("int")).alias(c) for c in cols]
miss = df.agg(*exprs)
miss_long = miss.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in miss.columns), [])])).alias("col", "missing_count"))
miss_long = miss_long.withColumn("total_rows", F.lit(df.count()))
.withColumn("missing_rate", F.col("missing_count") / F.col("total_rows"))
return miss_long
def check_primary_key_uniqueness(df: DataFrame, pk: str) -> DataFrame: if pk not in df.columns: return None dup = df.groupBy(pk).count().where(F.col("count") > 1) return dup
def check_exact_duplicates(df: DataFrame) -> DataFrame: # 完全相同行(全列相同) grp_cols = df.columns dup = df.groupBy(*grp_cols).count().where(F.col("count") > 1) return dup
def check_semantic_duplicates(df: DataFrame, cfg: dict) -> DataFrame:
# 7天内重复申请:同(id_number, loan_amount, loan_term_months)
needed = ["id_number", "loan_amount", "loan_term_months", "app_datetime"]
exists, _ = ensure_columns(df, needed)
if len(exists) < 4:
return None
w = Window.partitionBy("id_number", "loan_amount", "loan_term_months").orderBy(F.col("app_datetime").cast("timestamp"))
prev_ts = F.lag("app_datetime").over(w)
seconds_diff = F.when(prev_ts.isNotNull(), F.col("app_datetime").cast("long") - prev_ts.cast("long")).otherwise(None)
within = df.withColumn("prev_app_datetime", prev_ts)
.withColumn("seconds_from_prev", seconds_diff)
.withColumn("repeat_within_days",
(F.col("seconds_from_prev").isNotNull()) &
(F.col("seconds_from_prev") <= F.lit(cfg["business_rules"]["repeat_within_days"] * 86400)))
return within.where(F.col("repeat_within_days") == True)
def check_type_consistency_flags(df: DataFrame) -> DataFrame: cast_cols = [c for c in df.columns if c.endswith("__cast_ok")] if not cast_cols: return None bad = [] for c in cast_cols: src = c.replace("__cast_ok", "") bad.append(F.sum((~F.col(c)).cast("int")).alias(src)) out = df.agg(*bad) out_long = out.select(F.explode(F.create_map([F.lit(x) for kv in sum(([F.lit(c), F.col(c)] for c in out.columns), [])])).alias("col", "failed_casts")) return out_long
def check_value_ranges(df: DataFrame, ranges: dict) -> DataFrame:
violations = None
for col, r in ranges.items():
if col not in df.columns:
continue
conds = []
if r.get("min") is not None:
conds.append(F.col(col) < F.lit(r["min"]))
if r.get("max") is not None:
conds.append(F.col(col) > F.lit(r["max"]))
if not conds:
continue
v = df.where(conds[0] if len(conds) == 1 else (conds[0] | conds[1]))
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
violations = v if violations is None else violations.unionByName(v)
return violations
def check_enums(df: DataFrame, enums: dict) -> DataFrame:
violations = None
for col, allowed in enums.items():
if col not in df.columns or not allowed:
continue
v = df.where(~F.col(col).isin(allowed) & F.col(col).isNotNull())
.select(F.lit(col).alias("col"), F.col(col).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
violations = v if violations is None else violations.unionByName(v)
return violations
def check_formats(df: DataFrame, cfg: dict) -> DataFrame:
viols = None
# IP
if "ip" in df.columns:
v1 = df.where(F.col("ip").isNotNull() & (~F.col("ip").rlike(cfg["format_checks"]["ipv4_regex"])))
.select(F.lit("ip").alias("col"), F.col("ip").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v1 if viols is None else viols.unionByName(v1)
# id_number哈希长度
if CONFIG["id_number_col"] in df.columns:
v2 = df.where(F.col(CONFIG["id_number_col"]).isNotNull() &
(~F.length(F.col(CONFIG["id_number_col"])).isin(cfg["format_checks"]["id_hash_lengths"])))
.select(F.lit(CONFIG["id_number_col"]).alias("col"), F.col(CONFIG["id_number_col"]).alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v2 if viols is None else viols.unionByName(v2)
# device_id
if "device_id" in df.columns:
v3 = df.where(F.col("device_id").isNotNull() &
(~F.col("device_id").rlike(cfg["format_checks"]["device_id_regex"])))
.select(F.lit("device_id").alias("col"), F.col("device_id").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v3 if viols is None else viols.unionByName(v3)
# 省份白名单
if "province" in df.columns and cfg["format_checks"]["province_whitelist"]:
v4 = df.where(F.col("province").isNotNull() & (~F.col("province").isin(cfg["format_checks"]["province_whitelist"])))
.select(F.lit("province").alias("col"), F.col("province").alias("value"), *[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v4 if viols is None else viols.unionByName(v4)
return viols
def check_business_rules(df: DataFrame, cfg: dict) -> DataFrame:
br = cfg["business_rules"]
viols = None
now = F.current_timestamp()
# 时间窗口:不在未来+5分钟,且不早于past_days
if CONFIG["timestamp_col"] in df.columns:
v1 = df.where(
(F.col(CONFIG["timestamp_col"]) > F.expr(f"timestampadd(MINUTE, {br['app_datetime_future_minutes']}, current_timestamp())")) |
(F.col(CONFIG["timestamp_col"]) < F.expr(f"timestampadd(DAY, -{br['app_datetime_past_days']}, current_timestamp())"))
).select(F.lit(CONFIG["timestamp_col"]).alias("rule"),
F.col(CONFIG["timestamp_col"]).alias("value"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v1 if viols is None else viols.unionByName(v1)
# DTI
if "debt_total" in df.columns and "income_monthly" in df.columns:
dti = df.withColumn("DTI",
F.when(F.col("income_monthly") > 0, F.col("debt_total") / F.col("income_monthly"))
.otherwise(F.lit(None)))
v2 = dti.where(F.col("DTI").isNotNull() & (F.col("DTI") > F.lit(br["max_dti_static"])))
.select(F.lit("DTI_gt_static").alias("rule"),
F.col("DTI").alias("value"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v2 if viols is None else viols.unionByName(v2)
# income=0但debt>0的异常
v2b = df.where((F.col("income_monthly") <= 0) & (F.col("debt_total") > 0))
.select(F.lit("income_zero_but_debt_positive").alias("rule"),
F.struct("income_monthly", "debt_total").alias("value"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v2b if viols is None else viols.unionByName(v2b)
# 年龄与工作年限
if "age" in df.columns and "employment_length_years" in df.columns:
v3 = df.where(F.col("age") < (F.col("employment_length_years") + F.lit(br["min_age_minus_employment_years"])))
.select(F.lit("age_vs_employment_years").alias("rule"),
F.struct("age", "employment_length_years").alias("value"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v3 if viols is None else viols.unionByName(v3)
# 贷款金额必须 > 0
if "loan_amount" in df.columns:
v4 = df.where(F.col("loan_amount") <= 0)
.select(F.lit("loan_amount_positive").alias("rule"),
F.col("loan_amount").alias("value"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
viols = v4 if viols is None else viols.unionByName(v4)
return viols
def check_statistical_outliers(df: DataFrame, cfg: dict) -> DataFrame:
violations = None
for col in cfg["core_numeric_cols"]:
if col not in df.columns:
continue
bounds = approx_iqr_bounds(df.where(F.col(col).isNotNull()), col, cfg["iqr_multiplier"])
if not bounds:
continue
lower, upper = bounds
v = df.where((F.col(col) < F.lit(lower)) | (F.col(col) > F.lit(upper)))
.select(F.lit(col).alias("col"),
F.col(col).alias("value"),
F.lit(lower).alias("iqr_lower"),
F.lit(upper).alias("iqr_upper"),
*[F.col(c) for c in [CONFIG["primary_key"]] if c in df.columns])
violations = v if violations is None else violations.unionByName(v)
return violations
def check_time_continuity(df: DataFrame, cfg: dict) -> (DataFrame, DataFrame): # 连续性检查:按天/小时统计并找出缺口 ts = CONFIG["timestamp_col"] if ts not in df.columns: return None, None # 聚合 daily = df.groupBy(F.to_date(F.col(ts)).alias("date")).count() hourly = df.groupBy(F.date_format(F.col(ts), "yyyy-MM-dd HH:00:00").alias("hour")).count()
# 生成日期序列
min_max = df.agg(F.min(F.to_date(F.col(ts))).alias("min_d"),
F.max(F.to_date(F.col(ts))).alias("max_d")).collect()[0]
if min_max["min_d"] is None or min_max["max_d"] is None:
return daily, hourly
min_d = min_max["min_d"]
max_d = min_max["max_d"]
spark = df.sql_ctx.sparkSession
dates = spark.range(0, F.datediff(F.lit(max_d), F.lit(min_d)) + 1)\
.withColumn("date", F.expr(f"date_add(to_date('{min_d}'), cast(id as int))"))\
.select("date")
daily_full = dates.join(daily, on="date", how="left").fillna({"count": 0})
missing_days = daily_full.where(F.col("count") == 0)
# 小时级序列(窗口按整点)
# 计算小时范围
min_max_h = df.agg(F.min(F.date_trunc("hour", F.col(ts))).alias("min_h"),
F.max(F.date_trunc("hour", F.col(ts))).alias("max_h")).collect()[0]
if min_max_h["min_h"] is None or min_max_h["max_h"] is None:
return daily_full, hourly
min_h = min_max_h["min_h"]
max_h = min_max_h["max_h"]
hours = spark.range(0, (int((max_h.timestamp() - min_h.timestamp()) // 3600) + 1))\
.withColumn("hour", F.expr(f"timestampadd(HOUR, cast(id as int), timestamp('{min_h}'))"))\
.select("hour")
hourly_full = hours.join(hourly.withColumn("hour", F.col("hour").cast("timestamp")), on="hour", how="left").fillna({"count": 0})
missing_hours = hourly_full.where(F.col("count") == 0)
# 返回覆盖与缺口
return missing_days, missing_hours
def main(): parser = argparse.ArgumentParser() parser.add_argument("--input", required=True, help="输入路径(目录或文件)") parser.add_argument("--input-format", default="parquet", choices=["parquet", "csv", "json"], help="输入格式") parser.add_argument("--output", required=True, help="输出目录") parser.add_argument("--streaming", default="false", choices=["true", "false"], help="是否使用Structured Streaming") parser.add_argument("--csv-header", default="true", choices=["true", "false"], help="CSV是否包含表头") args = parser.parse_args()
spark = SparkSession.builder.appName("DQ_Loan_Applications").getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# 读入
if args.streaming == "true":
# 流式模式(示例):以文件夹为源,micro-batch执行;注意:某些汇总(如approxQuantile)需使用complete/append触发器或批流分离
if args.input_format == "parquet":
df = spark.readStream.format("parquet").load(args.input)
elif args.input_format == "csv":
df = spark.readStream.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
else:
df = spark.readStream.json(args.input)
# 简化:流式仅做记录级别规则检查与落地(完整统计建议离线批处理)
print("注意:流式模式下统计型/IQR类校验建议离线批处理运行,本脚本流式仅演示结构,可在生产中分层实现。")
sys.exit(0)
else:
if args.input_format == "parquet":
df = spark.read.parquet(args.input)
elif args.input_format == "csv":
df = spark.read.option("header", args.csv_header == "true").option("inferSchema", "true").csv(args.input)
else:
df = spark.read.json(args.input)
# 统一类型并标记类型转换失败
df = unify_types(df, CONFIG["schema_expectation"])
# 1) 缺失值检查
missing_report = check_missingness(df, CONFIG)
write_df(missing_report, f"{args.output}/missing_report")
# 2) 重复值检查
pk_dups = check_primary_key_uniqueness(df, CONFIG["primary_key"])
write_df(pk_dups, f"{args.output}/primary_key_duplicates")
exact_dups = check_exact_duplicates(df)
write_df(exact_dups, f"{args.output}/exact_duplicates")
semantic_dups = check_semantic_duplicates(df, CONFIG)
write_df(semantic_dups, f"{args.output}/semantic_duplicates")
# 3) 统计异常(IQR)
stat_outliers = check_statistical_outliers(df, CONFIG)
write_df(stat_outliers, f"{args.output}/statistical_outliers")
# 4) 业务规则异常
biz_violations = check_business_rules(df, CONFIG)
write_df(biz_violations, f"{args.output}/business_rule_violations")
# 5) 类型一致性检查(转换失败计数)
type_cast_report = check_type_consistency_flags(df)
write_df(type_cast_report, f"{args.output}/type_cast_report")
# 6) 主键唯一性(已在2中输出详细重复明细),这里输出唯一性状态
if pk_dups is not None:
pk_dup_count = pk_dups.count()
else:
pk_dup_count = None
# 7) 值域范围检查(数值范围)
range_violations = check_value_ranges(df, CONFIG["ranges"])
write_df(range_violations, f"{args.output}/range_violations")
# 8) 格式规范性检查
format_violations = check_formats(df, CONFIG)
write_df(format_violations, f"{args.output}/format_violations")
# 9) 逻辑一致性检查(在业务规则中覆盖了跨字段主要逻辑)
# 若需额外逻辑可在check_business_rules中扩展
# 10) 时间序列连续性检查
missing_days, missing_hours = check_time_continuity(df, CONFIG)
write_df(missing_days, f"{args.output}/missing_days")
write_df(missing_hours, f"{args.output}/missing_hours")
# 构建汇总指标
total_rows = df.count()
summary_rows = []
def cnt(df_):
return None if df_ is None else df_.count()
summary_rows.append(("total_rows", total_rows))
summary_rows.append(("missing_report_cols", cnt(missing_report)))
summary_rows.append(("primary_key_dup_count", cnt(pk_dups)))
summary_rows.append(("exact_duplicates_groups", cnt(exact_dups)))
summary_rows.append(("semantic_duplicates_rows", cnt(semantic_dups)))
summary_rows.append(("statistical_outliers_rows", cnt(stat_outliers)))
summary_rows.append(("business_rule_violations_rows", cnt(biz_violations)))
summary_rows.append(("type_cast_failed_cols", cnt(type_cast_report)))
summary_rows.append(("range_violations_rows", cnt(range_violations)))
summary_rows.append(("format_violations_rows", cnt(format_violations)))
summary_rows.append(("missing_days", cnt(missing_days)))
summary_rows.append(("missing_hours", cnt(missing_hours)))
summary_df = spark.createDataFrame(summary_rows, schema=["metric", "value"])
write_df(summary_df, f"{args.output}/summary_metrics")
# 导出各类异常样本(便于快速排查)
sample_targets = {
"statistical_outliers": stat_outliers,
"business_rule_violations": biz_violations,
"range_violations": range_violations,
"format_violations": format_violations,
"semantic_duplicates": semantic_dups,
"primary_key_duplicates": pk_dups
}
for name, d in sample_targets.items():
if d is not None and d.count() > 0:
write_df(d.limit(CONFIG["sample_rows"]), f"{args.output}/samples/{name}")
print("DQ校验完成。汇总指标:")
summary_df.show(truncate=False)
spark.stop()
if name == "main": main()
实施与扩展说明:
以下为面向海量分钟级工业传感器数据(约8亿行/90天)的可扩展数据完整性校验脚本。脚本使用 PySpark,覆盖以下验证内容:
说明与假设:
脚本(PySpark 3.x):
from pyspark.sql import SparkSession, functions as F, types as T, Window as W import re
CONFIG = { "input": { "format": "delta", # 可选:delta/parquet/table "path_or_table": "db.sensor_delta" # 路径或表名 }, "output": { "base_path": "dbfs:/tmp/quality_checks", # 或 HDFS/S3 路径 "save_mode": "overwrite" # overwrite/append }, "spark": { "shuffle_partitions": 2000, "adaptive_enabled": "true" }, "schema": { "device_id": T.StringType(), "timestamp": T.TimestampType(), # UTC "temperature": T.DoubleType(), # ℃ "humidity": T.DoubleType(), # %RH "vibration_rms": T.DoubleType(), # g "status": T.StringType(), # 枚举 "firmware_version": T.StringType(), "line_id": T.StringType(), "site": T.StringType() }, "primary_key": ["device_id", "timestamp"], "sampling": { "interval_seconds": 60, "expect_minute_aligned": True, # 时间戳是否应对齐至整分钟(秒=0) "alignment_tolerance_seconds": 0 # 若允许抖动可设为 >0 }, "domain_rules": { "status_enum": ["RUNNING","IDLE","MAINTENANCE","OFF","ERROR"], "device_id_regex": r"^[A-Za-z0-9_-.]+$", "firmware_semver_regex": r"^\d+.\d+.\d+(?:-[A-Za-z0-9]+)?$", "temperature_range": [-40.0, 125.0], # ℃(示例) "humidity_range": [0.0, 100.0], # %RH(物理边界) "vibration_rms_range": [0.0, 50.0], # g(示例,上限请现场确认) "rate_limits_per_min": { # 每分钟变化率上限(示例) "temperature": 10.0, # ℃/min "humidity": 20.0, # %RH/min "vibration_rms": 5.0 # g/min } }, "stat_outlier": { "use_iqr": True, "iqr_k": 3.0, # IQR*3 之外视为异常 "group_level": "device_id", # 以设备为组做分位统计 "approx_pct": [0.25, 0.5, 0.75], "rel_error": 1e-3 }, "time_bounds": { # 时间合理性(可选) "min_days_ago": 120, # 不早于当前时间-120天 "max_hours_ahead": 1 # 不晚于当前时间+1小时 }, "firmware_consistency_check": { "enabled": True, "pre_window_minutes": 60, "post_window_minutes": 60, "status_for_analysis": ["RUNNING"], # 仅在运行工况下对比 "ratio_tolerance": 0.15, # 核心指标中位数前后比偏离阈值(15%) "metrics": ["temperature", "humidity", "vibration_rms"] } }
spark = SparkSession.builder.appName("SensorDataQualityChecks").getOrCreate() spark.conf.set("spark.sql.shuffle.partitions", str(CONFIG["spark"]["shuffle_partitions"])) spark.conf.set("spark.sql.adaptive.enabled", CONFIG["spark"]["adaptive_enabled"])
expected_schema = T.StructType([T.StructField(k, v, True) for k, v in CONFIG["schema"].items()])
input_cfg = CONFIG["input"] if input_cfg["format"] == "table": df_raw = spark.table(input_cfg["path_or_table"]) elif input_cfg["format"] in ("delta", "parquet"): df_raw = spark.read.format(input_cfg["format"]).schema(expected_schema).load(input_cfg["path_or_table"]) else: raise ValueError("Unsupported input format")
df = df_raw.select([F.col(c) for c in CONFIG["schema"].keys()])
for c, t in CONFIG["schema"].items(): df = df.withColumn(c, F.col(c).cast(t))
df = df.withColumn("unix_ts", F.col("timestamp").cast("long"))
missing_counts = df.select([F.sum(F.col(c).isNull().cast("bigint")).alias(c) for c in df.columns]) missing_counts.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/missing_counts")
status_ok = F.col("status").isin(CONFIG["domain_rules"]["status_enum"]) device_id_ok = F.col("device_id").rlike(CONFIG["domain_rules"]["device_id_regex"]) fw_ok = F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]) | F.col("firmware_version").isNull()
dtype_violations = df.where(~status_ok | ~device_id_ok | ~fw_ok) dtype_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/dtype_format_violations")
pk = CONFIG["primary_key"] dups = df.groupBy(*pk).count().where(F.col("count") > 1) dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_pk")
row_hash = F.sha2(F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("NULL")) for c in df.columns]), 256) df_with_hash = df.withColumn("row_hash", row_hash) full_dups = df_with_hash.groupBy("row_hash").count().where(F.col("count") > 1) full_dups.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/duplicate_rows")
r = CONFIG["domain_rules"] range_violations = df.where( (F.col("temperature").isNotNull() & ((F.col("temperature") < F.lit(r["temperature_range"][0])) | (F.col("temperature") > F.lit(r["temperature_range"][1])))) | (F.col("humidity").isNotNull() & ((F.col("humidity") < F.lit(r["humidity_range"][0])) | (F.col("humidity") > F.lit(r["humidity_range"][1])))) | (F.col("vibration_rms").isNotNull() & ((F.col("vibration_rms") < F.lit(r["vibration_rms_range"][0])) | (F.col("vibration_rms") > F.lit(r["vibration_rms_range"][1])))) ) range_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/range_violations")
w_dev_time = W.partitionBy("device_id").orderBy("timestamp")
df_seq = df.select(*df.columns).withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("prev_unix", F.col("prev_ts").cast("long"))
.withColumn("delta_sec", F.col("unix_ts") - F.col("prev_unix"))
interval = CONFIG["sampling"]["interval_seconds"]
gaps = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") > interval))
.withColumn("missing_points", (F.col("delta_sec")/interval - F.lit(1)).cast("bigint"))
gaps.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_gaps")
retransmissions = df_seq.where(F.col("prev_ts").isNotNull() & (F.col("delta_sec") == 0)) retransmissions.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/retransmissions")
if CONFIG["sampling"]["expect_minute_aligned"]: aligned = df.withColumn("sec_in_min", (F.col("unix_ts") % 60).cast("int")) misaligned = aligned.where(F.abs(F.col("sec_in_min")) > CONFIG["sampling"]["alignment_tolerance_seconds"]) misaligned.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/misaligned_timestamps")
now = F.current_timestamp() upper_bound = F.expr(f"timestampadd(HOUR, {CONFIG['time_bounds']['max_hours_ahead']}, current_timestamp())") lower_bound = F.expr(f"timestampadd(DAY, -{CONFIG['time_bounds']['min_days_ago']}, current_timestamp())") time_violations = df.where((F.col("timestamp") < lower_bound) | (F.col("timestamp") > upper_bound)) time_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/time_violations")
pct = CONFIG["stat_outlier"]["approx_pct"] rel_err = CONFIG["stat_outlier"]["rel_error"] group_col = CONFIG["stat_outlier"]["group_level"] iqr_k = CONFIG["stat_outlier"]["iqr_k"]
def iqr_bounds(df_in, col):
stats = df_in.groupBy(group_col).agg(
F.expr(f"percentile_approx({col}, array({pct[0]},{pct[1]},{pct[2]}), {int(1/rel_err)})").alias("p")
).select(
F.col(group_col),
F.col("p").getItem(0).alias("q1"),
F.col("p").getItem(1).alias("q2"),
F.col("p").getItem(2).alias("q3")
).withColumn("iqr", F.col("q3") - F.col("q1"))
.withColumn("lo", F.col("q1") - F.lit(iqr_k)*F.col("iqr"))
.withColumn("hi", F.col("q3") + F.lit(iqr_k)*F.col("iqr"))
return stats
metrics = ["temperature","humidity","vibration_rms"] outlier_union = None
for m in metrics:
stats_m = iqr_bounds(df.where(F.col(m).isNotNull()), m)
flagged = df.join(stats_m, on=group_col, how="left")
.where(F.col(m).isNotNull() & ((F.col(m) < F.col("lo")) | (F.col(m) > F.col("hi"))))
.select("device_id","timestamp", F.lit(m).alias("metric"), F.col(m).alias("value"), "lo","hi","q1","q2","q3")
outlier_union = flagged if outlier_union is None else outlier_union.unionByName(flagged)
if outlier_union is not None: outlier_union.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/stat_outliers")
rate = CONFIG["domain_rules"]["rate_limits_per_min"]
df_rate = df.select("device_id","timestamp","temperature","humidity","vibration_rms")
.withColumn("prev_temperature", F.lag("temperature").over(w_dev_time))
.withColumn("prev_humidity", F.lag("humidity").over(w_dev_time))
.withColumn("prev_vibration_rms", F.lag("vibration_rms").over(w_dev_time))
.withColumn("prev_ts", F.lag("timestamp").over(w_dev_time))
.withColumn("delta_min", (F.col("unix_ts") - F.col("prev_ts").cast("long"))/60.0)
biz_violations = df_rate.where(F.col("prev_ts").isNotNull() & (F.col("delta_min") > 0)).select( "device_id","timestamp","delta_min", (F.abs(F.col("temperature")-F.col("prev_temperature"))/F.col("delta_min") > F.lit(rate["temperature"])).alias("temp_rate_flag"), (F.abs(F.col("humidity")-F.col("prev_humidity"))/F.col("delta_min") > F.lit(rate["humidity"])).alias("hum_rate_flag"), (F.abs(F.col("vibration_rms")-F.col("prev_vibration_rms"))/F.col("delta_min") > F.lit(rate["vibration_rms"])).alias("vib_rate_flag"), "temperature","prev_temperature","humidity","prev_humidity","vibration_rms","prev_vibration_rms" ).where("temp_rate_flag OR hum_rate_flag OR vib_rate_flag")
biz_violations.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/biz_rule_outliers")
def parse_semver_col(col): # 提取主次修订号;非匹配置为 NULL return F.when(F.col("firmware_version").rlike(CONFIG["domain_rules"]["firmware_semver_regex"]), F.struct( F.regexp_extract(col, r"^(\d+)", 1).cast("int").alias("maj"), F.regexp_extract(col, r"^\d+.(\d+)", 1).cast("int").alias("min"), F.regexp_extract(col, r"^\d+.\d+.(\d+)", 1).cast("int").alias("pat") )).otherwise(F.lit(None))
df_fw = df.withColumn("semver", parse_semver_col(F.col("firmware_version"))) w_fw = W.partitionBy("device_id").orderBy("timestamp") df_fw = df_fw.withColumn("prev_semver", F.lag("semver").over(w_fw))
downgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") < F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") < F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") < F.col("prev_semver.pat"))) )) downgrades.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_downgrades")
upgrades = df_fw.where(F.col("semver").isNotNull() & F.col("prev_semver").isNotNull() & ( (F.col("semver.maj") > F.col("prev_semver.maj")) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") > F.col("prev_semver.min"))) | ((F.col("semver.maj") == F.col("prev_semver.maj")) & (F.col("semver.min") == F.col("prev_semver.min")) & (F.col("semver.pat") > F.col("prev_semver.pat"))) )).select("device_id","timestamp","firmware_version").withColumnRenamed("timestamp","upgrade_ts")
if CONFIG["firmware_consistency_check"]["enabled"]: pre_m = CONFIG["firmware_consistency_check"]["pre_window_minutes"] post_m = CONFIG["firmware_consistency_check"]["post_window_minutes"] run_status = CONFIG["firmware_consistency_check"]["status_for_analysis"] metrics_for_fw = CONFIG["firmware_consistency_check"]["metrics"] tol = CONFIG["firmware_consistency_check"]["ratio_tolerance"]
# 将升级事件与同设备数据进行窗口联接(限定前后窗口,减少放大联接)
df_run = df.where(F.col("status").isin(run_status)).select("device_id","timestamp", *metrics_for_fw)
# 预备:把升级事件展开为左右窗口范围
upgrades_range = upgrades.withColumn("pre_start", F.expr(f"timestampadd(MINUTE, -{pre_m}, upgrade_ts)")) \
.withColumn("post_end", F.expr(f"timestampadd(MINUTE, {post_m}, upgrade_ts)"))
# 左右窗口聚合中位数
pre_join = df_run.alias("d").join(upgrades_range.alias("u"),
(F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>=F.col("u.pre_start")) & (F.col("d.timestamp")<F.col("u.upgrade_ts")),
"inner")
post_join = df_run.alias("d").join(upgrades_range.alias("u"),
(F.col("d.device_id")==F.col("u.device_id")) & (F.col("d.timestamp")>F.col("u.upgrade_ts")) & (F.col("d.timestamp")<=F.col("u.post_end")),
"inner")
pre_aggs = pre_join.groupBy("u.device_id","u.upgrade_ts").agg(
*[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"pre_{m}_p50") for m in metrics_for_fw]
)
post_aggs = post_join.groupBy("u.device_id","u.upgrade_ts").agg(
*[F.expr(f"percentile_approx({m}, 0.5, 1000)").alias(f"post_{m}_p50") for m in metrics_for_fw]
)
fw_cmp = pre_aggs.join(post_aggs, ["device_id","upgrade_ts"], "inner")
# 计算比值偏离
for m in metrics_for_fw:
fw_cmp = fw_cmp.withColumn(f"{m}_ratio", F.col(f"post_{m}_p50")/F.col(f"pre_{m}_p50"))
# 标记超阈事件
conds = [ (F.col(f"{m}_ratio") > (1+tol)) | (F.col(f"{m}_ratio") < (1-tol)) for m in metrics_for_fw ]
fw_inconsistent = fw_cmp.where(F.reduce(lambda a, b: a | b, conds))
fw_inconsistent.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/firmware_metric_inconsistency")
logic_examples = df.where( (F.col("status").isin(["OFF","MAINTENANCE"])) & (F.col("vibration_rms").isNotNull()) & (F.col("vibration_rms") > 0.2) # 示例阈值,按现场调整 ) logic_examples.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/logic_inconsistency_examples")
summary = {}
summary["total_rows"] = df.count() summary_df = spark.createDataFrame([(k, v) for k, v in summary.items()], schema="metric string, value string") summary_df.write.mode(CONFIG["output"]["save_mode"]).format("delta").save(f"{CONFIG['output']['base_path']}/summary")
def save_count(df_in, name):
cnt = df_in.count()
spark.createDataFrame([(name, cnt)], "metric string, value long")
.write.mode("append").format("delta").save(f"{CONFIG['output']['base_path']}/summary")
save_count(dups, "duplicate_pk") save_count(full_dups, "duplicate_rows") save_count(range_violations, "range_violations") save_count(gaps, "time_gaps_events") save_count(retransmissions, "retransmissions") if CONFIG["sampling"]["expect_minute_aligned"]: save_count(misaligned, "misaligned_timestamps") if outlier_union is not None: save_count(outlier_union, "stat_outliers") save_count(biz_violations, "biz_rule_outliers") save_count(dtype_violations, "dtype_format_violations") save_count(time_violations, "time_violations") save_count(downgrades, "firmware_downgrades") if CONFIG["firmware_consistency_check"]["enabled"]: save_count(fw_inconsistent, "firmware_metric_inconsistency") save_count(logic_examples, "logic_inconsistency_examples")
print("Data quality checks completed. Results saved to:", CONFIG["output"]["base_path"])
使用建议与说明:
把复杂的数据完整性检查变成一条可复用的提示词——自动生成可运行的Python脚本,按你的数据特征定制校验项,快速定位缺失、重复、异常、规则冲突等问题。帮助数据团队在报表上线、模型训练、数据交付前,建立标准化的“入库前质检”,节省人力时间、降低风险、提升数据可信度;同时支持中文或英文说明输出,便于跨团队协作。试用即可三步拿到脚本;付费后解锁高级校验模板、结果摘要与团队协作能力,让数据质量从“靠经验”升级为“有章可循”。
在接手新数据源或合并多表时,快速体检数据质量,自动生成校验与修复脚本,确保报表与洞察不被脏数据干扰。
在数据管道关键节点插入校验,一键生成规则与报告,提前拦截异常,减少回滚与返工,稳定数据入仓与落地。
活动上线前验证关键字段完整性与口径一致,及时修正数据缺口,保障指标可比与复盘可信,降低误判与争议。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期