数据质量检查方法总结

176 浏览
16 试用
4 购买
Sep 26, 2025更新

总结数据集质量检查的五种方法,提供专业技术建议。

以下为针对数据集 events(字段:id INT, ts TIMESTAMP, src STRING, val DOUBLE;分区:dt)的五类数据质量检查方法。每类包含目标、规则与可执行的示例 SQL(以 Spark SQL/Hive SQL 风格为参考,可按需调整)。

  1. 分区与时间戳一致性检查
  • 目标:确保分区键 dt 与记录时间 ts 一致,避免跨日落入错误分区。

  • 规则示例:

    • 记录必须满足 to_date(ts) = to_date(dt)。
    • 禁止 ts 为空或不合法。
  • 示例 SQL: -- 异常记录计数(按分区) SELECT dt, COUNT(*) AS bad_cnt FROM events WHERE ts IS NULL OR to_date(ts) <> to_date(dt) GROUP BY dt;

    -- 占比监控(可设阈值,如 bad_rate < 0.1%) SELECT dt, SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) AS bad_cnt, COUNT() AS total_cnt, SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) / COUNT() AS bad_rate FROM events GROUP BY dt;

  1. 必填字段完整性检查(非空与可解析性)
  • 目标:关键字段必须存在且类型可用。

  • 规则示例:

    • id、ts、src 为必填;val 是否必填视业务定义(此处示例为允许为空)。
    • 字符串字段去除首尾空白后不得为空串。
  • 示例 SQL: SELECT dt, SUM(CASE WHEN id IS NULL THEN 1 ELSE 0 END) AS id_nulls, SUM(CASE WHEN ts IS NULL THEN 1 ELSE 0 END) AS ts_nulls, SUM(CASE WHEN src IS NULL OR trim(src) = '' THEN 1 ELSE 0 END) AS src_nulls FROM events GROUP BY dt;

    -- 可增加类型可解析性检查(若原始为字符串,需要 CAST 成功) -- 示例:若 ts 原始为 STRING,可用 TRY_CAST/正则预校验(具体函数依引擎而定)。

  1. 唯一性/重复记录检查
  • 目标:识别重复事件,保障下游聚合与审计的准确性。

  • 规则示例:

    • 定义业务键(需与业务方确认)。常见候选键:(id, ts, src) 或 (id, src) 或 (id, dt, src)。
    • 以下以 (id, ts, src) 为示例,可通过参数化配置。
  • 示例 SQL: -- 基于业务键的重复行 SELECT dt, id, ts, src, COUNT() AS dup_cnt FROM events GROUP BY dt, id, ts, src HAVING COUNT() > 1;

    -- 或对分区级别的重复率快速评估 SELECT dt, COUNT() AS total_cnt, COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) AS distinct_cnt, 1 - COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) / COUNT() AS dup_rate FROM events GROUP BY dt;

  1. 值域与参照完整性检查
  • 目标:数值字段处于可接受范围;分类字段在受控字典内。

  • 规则示例:

    • val 数值有效:非 NULL、非 NaN,且在业务阈值范围内(示例:val BETWEEN min_val AND max_val,阈值需与业务确认)。
    • src 属于白名单或维表 dim_src(src_code) 中的合法值。
  • 示例 SQL: -- val 值域校验(以示例阈值 -1e6 到 1e6) SELECT dt, SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) AS bad_val_cnt, COUNT() AS total_cnt, SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) / COUNT() AS bad_val_rate FROM events GROUP BY dt;

    -- src 参照完整性(找出未知来源) -- 需存在维表 dim_src(src_code) SELECT e.dt, e.src, COUNT(*) AS unknown_src_cnt FROM events e LEFT JOIN dim_src d ON e.src = d.src_code WHERE d.src_code IS NULL GROUP BY e.dt, e.src;

  1. 数据新鲜度与体量基线检查
  • 目标:确保分区按时到达、行数在合理范围,及时发现数据延迟或大幅波动。

  • 规则示例:

    • 今日分区 should exist by T+0 HH:MM(由调度约定);如未到达则告警。
    • 行数与近 7 日基线比对,偏离阈值(如 ±20%)告警。
  • 示例 SQL: -- 最近 N 天应到分区检测(以最近 7 天为例;Spark 可用 sequence 生成日期序列) WITH cal AS ( SELECT explode(sequence(date_sub(current_date(), 6), current_date())) AS dt ), arrived AS ( SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt FROM events WHERE to_date(dt) BETWEEN date_sub(current_date(), 6) AND current_date() GROUP BY to_date(dt) ) SELECT c.dt, COALESCE(a.row_cnt, 0) AS row_cnt, CASE WHEN a.row_cnt IS NULL THEN 'MISSING_PARTITION' ELSE 'OK' END AS status FROM cal c LEFT JOIN arrived a USING (dt);

    -- 今日行数与过去 7 日基线(使用近 7 天 row_cnt 的中位数或近似分位数) WITH hist AS ( SELECT to_date(dt) AS dt, COUNT() AS row_cnt FROM events WHERE to_date(dt) BETWEEN date_sub(current_date(), 7) AND date_sub(current_date(), 1) GROUP BY to_date(dt) ), today AS ( SELECT to_date(dt) AS dt, COUNT() AS row_cnt FROM events WHERE to_date(dt) = current_date() GROUP BY to_date(dt) ) SELECT t.dt, t.row_cnt AS today_cnt, percentile_approx(h.row_cnt, 0.5) AS p50_last7, CASE WHEN t.row_cnt < 0.8 * percentile_approx(h.row_cnt, 0.5) THEN 'LOW_VOLUME' WHEN t.row_cnt > 1.2 * percentile_approx(h.row_cnt, 0.5) THEN 'HIGH_VOLUME' ELSE 'OK' END AS volume_status FROM today t CROSS JOIN (SELECT collect_list(row_cnt) AS row_cnt FROM hist) h;

实施建议

  • 将上述规则参数化(必填字段集合、业务键、val 阈值、src 白名单、偏离阈值、时限等),存入配置表,便于多表复用。
  • 以数据质量作业的方式编排(如 Airflow/Argo/Scheduler),输出检查结果到审计表 dq_results,字段包含表名、分区、规则ID、度量值、阈值、状态、运行时间。
  • 对失败规则触发告警(Slack/邮件/告警平台),并根据严重度选择阻断下游或允许降级放行。
  • 对重复与越界数据可增加自动纠偏策略(如去重写回、隔离至 quarantine 表,附带错误原因)。

Below are five data quality checks for a “paid” dataset derived from table orders with fields: oid (INT), uid (INT), pay_ts (TIMESTAMP), amt (DECIMAL). Each check includes its objective, rule, and an example SQL query (ANSI SQL) applicable to most data warehouses.

  1. Paid definition compliance and required-field completeness
  • Objective: Ensure the dataset strictly adheres to the “paid” definition and contains required values.

  • Rule:

    • Include only rows where pay_ts IS NOT NULL.
    • amt must be positive.
    • oid and uid must be present (NOT NULL).
  • Example SQL: SELECT SUM(CASE WHEN pay_ts IS NULL THEN 1 ELSE 0 END) AS violations_missing_pay_ts, SUM(CASE WHEN amt IS NULL OR amt <= 0 THEN 1 ELSE 0 END) AS violations_amt_nonpositive_or_null, SUM(CASE WHEN oid IS NULL THEN 1 ELSE 0 END) AS violations_missing_oid, SUM(CASE WHEN uid IS NULL THEN 1 ELSE 0 END) AS violations_missing_uid FROM orders WHERE pay_ts IS NOT NULL OR pay_ts IS NULL;

    -- Optional hard filter audit (ensures no unpaid rows leak into “paid” dataset) SELECT COUNT(*) AS unpaid_rows_present FROM orders WHERE pay_ts IS NULL;

  1. Primary key uniqueness (duplicate detection)
  • Objective: Prevent duplicate paid orders for the same oid.

  • Rule: Each oid appears at most once in the paid dataset.

  • Example SQL: SELECT oid, COUNT() AS dup_count FROM orders WHERE pay_ts IS NOT NULL GROUP BY oid HAVING COUNT() > 1;

    -- If duplicates exist, inspect conflicting values SELECT o.* FROM ( SELECT oid FROM orders WHERE pay_ts IS NOT NULL GROUP BY oid HAVING COUNT(*) > 1 ) d JOIN orders o USING (oid) ORDER BY o.oid, o.pay_ts;

  1. Timestamp validity (temporal sanity)
  • Objective: Validate pay_ts against reasonable temporal bounds to catch clock, timezone, or ingestion errors.
  • Rule:
    • pay_ts must not be in the future beyond a small tolerance.
    • pay_ts must be later than a configured lower bound (e.g., system go‑live date).
  • Example SQL (using a 5-minute future tolerance and a configurable lower bound): WITH params AS ( SELECT CURRENT_TIMESTAMP AS now_ts, TIMESTAMP '2020-01-01 00:00:00' AS lower_bound_ts ) SELECT COUNT(*) AS invalid_timestamps FROM orders o CROSS JOIN params p WHERE o.pay_ts IS NOT NULL AND (o.pay_ts > p.now_ts + INTERVAL '5' MINUTE OR o.pay_ts < p.lower_bound_ts);
  1. Referential integrity of uid (if a user dimension exists)
  • Objective: Ensure that uid in orders maps to a valid user record.

  • Rule: For paid rows, uid must exist in the users dimension (or source of truth).

  • Example SQL: SELECT COUNT(*) AS missing_users FROM orders o LEFT JOIN dim_users u ON o.uid = u.uid WHERE o.pay_ts IS NOT NULL AND u.uid IS NULL;

    -- If there is a known set of valid uids (e.g., from a lookup table), use that instead of dim_users.

  1. Completeness and reconciliation of paid counts/amounts (source vs. curated)
  • Objective: Detect loss or duplication between source events and the curated paid dataset.
  • Rule:
    • For each date, paid counts and sums should reconcile within a defined tolerance against the authoritative source.
  • Example SQL (assuming a source table orders_src contains raw payment events): WITH src AS ( SELECT CAST(pay_ts AS DATE) AS dt, COUNT() AS src_cnt, SUM(amt) AS src_amt FROM orders_src WHERE pay_ts IS NOT NULL GROUP BY CAST(pay_ts AS DATE) ), tgt AS ( SELECT CAST(pay_ts AS DATE) AS dt, COUNT() AS tgt_cnt, SUM(amt) AS tgt_amt FROM orders WHERE pay_ts IS NOT NULL GROUP BY CAST(pay_ts AS DATE) ) SELECT s.dt, s.src_cnt, t.tgt_cnt, s.src_amt, t.tgt_amt, (t.tgt_cnt - s.src_cnt) AS diff_cnt, (t.tgt_amt - s.src_amt) AS diff_amt FROM src s JOIN tgt t USING (dt) WHERE ABS(t.tgt_cnt - s.src_cnt) > 0 OR ABS(t.tgt_amt - s.src_amt) > 0;

Implementation notes:

  • If orders_src or dim_users is not available, substitute the appropriate upstream/source-of-truth tables for reconciliation and referential integrity checks.
  • Thresholds (future tolerance, lower bound dates, amount maxima) should be parameterized and aligned with business SLAs and known system constraints.
  • These checks can be automated via scheduled jobs or embedded into a data quality framework (e.g., Great Expectations, dbt tests, or Airflow sensors) and should emit metrics to monitoring/alerting systems.

Below are five practical data quality checks tailored for the training dataset clicks with fields: sid (int), label (binary), age (int), last_ts (timestamp), ctr (double). Each check includes what to assert and an example implementation (PySpark) you can embed in a data validation job.

  1. Schema and Mandatory Field Conformance
  • Assertion:
    • Columns exist with expected types: sid:int, label:int (binary), age:int, last_ts:timestamp, ctr:double.
    • Mandatory fields are non-null: sid, label, last_ts.
  • Rationale: Prevents ingestion of malformed records and ensures downstream transformations do not fail due to type or null issues.
  • Example (PySpark):
    • Enforce schema on read: schema = StructType([ StructField("sid", IntegerType(), False), StructField("label", IntegerType(), False), StructField("age", IntegerType(), True), StructField("last_ts", TimestampType(), False), StructField("ctr", DoubleType(), True), ]) df = spark.read.schema(schema).parquet(".../clicks")
    • Null check: violations = df.filter( col("sid").isNull() | col("label").isNull() | col("last_ts").isNull() ).count() assert violations == 0, f"Nulls found in mandatory fields: {violations}"
  1. Identifier Uniqueness and Duplicate Detection
  • Assertion:
    • If sid is expected to uniquely identify training records, enforce uniqueness (no duplicate sids).
    • If duplicates can occur (e.g., multiple events per sid), flag the duplicate rate and apply a deterministic dedup rule (e.g., keep the latest last_ts).
  • Rationale: Duplicate records distort distributions and can bias model training.
  • Example (PySpark):
    • Detect duplicates: dup_df = df.groupBy("sid").count().filter(col("count") > 1) dup_rate = dup_df.count() / df.select("sid").distinct().count() assert dup_rate == 0, f"Duplicate sid rate: {dup_rate:.6f}"
    • Optional dedup (keep most recent): window = Window.partitionBy("sid").orderBy(col("last_ts").desc()) df_dedup = df.withColumn("rn", row_number().over(window)).filter(col("rn") == 1).drop("rn")
  1. Domain and Range Constraints
  • Assertion:

    • label ∈ {0, 1}
    • age ∈ [0, 120] (adjust threshold to business context)
    • ctr ∈ [0.0, 1.0]
    • last_ts ≤ current time (no future timestamps)
  • Rationale: Catches impossible or out-of-contract values that indicate upstream defects or parsing issues.

  • Example (PySpark): from pyspark.sql.functions import current_timestamp

    invalid = df.filter( (col("label").isin(0, 1) == False) | (col("age").isNotNull() & ( (col("age") < 0) | (col("age") > 120) )) | (col("ctr").isNotNull() & ( (col("ctr") < 0.0) | (col("ctr") > 1.0) )) | (col("last_ts") > current_timestamp()) ) assert invalid.count() == 0, f"Domain/range violations: {invalid.count()}"

  1. Timeliness and Coverage (Freshness within Training Window)
  • Assertion:

    • The dataset’s max(last_ts) is within the expected SLA window (e.g., within 24 hours of now for daily training).
    • The dataset covers the intended training period (e.g., min(last_ts) and max(last_ts) span expected window).
  • Rationale: Ensures the model trains on timely and representative data; stale inputs degrade performance.

  • Example (PySpark): from pyspark.sql.functions import max as spark_max, min as spark_min import datetime

    stats = df.agg(spark_min("last_ts").alias("min_ts"), spark_max("last_ts").alias("max_ts")).collect()[0] now = datetime.datetime.utcnow() freshness_ok = (now - stats["max_ts"]).total_seconds() <= 24 * 3600 # 24h SLA assert freshness_ok, f"Data not fresh. max(last_ts)={stats['max_ts']} UTC"

    Optional coverage check for a 7-day window

    coverage_ok = (stats["max_ts"] - stats["min_ts"]).days >= 7 assert coverage_ok, f"Insufficient coverage: {stats['min_ts']}..{stats['max_ts']}"

  1. Missingness and Distribution Stability (Drift Detection)
  • Assertion:

    • Missingness thresholds:
      • age null rate ≤ 1% (example)
      • ctr null rate ≤ 1% (example)
    • Distribution stability vs. a baseline snapshot (e.g., previous day/week):
      • label rate within expected band (e.g., 0.3–0.7 if historically balanced)
      • ctr mean/variance within tolerance
      • age distribution similarity (use PSI or KS test)
  • Rationale: Detects upstream shifts and silent failures that maintain schema but change data characteristics.

  • Example (PySpark): from pyspark.sql.functions import mean, stddev, count, when

    total = df.count() age_null_rate = df.filter(col("age").isNull()).count() / total ctr_null_rate = df.filter(col("ctr").isNull()).count() / total assert age_null_rate <= 0.01, f"age null rate too high: {age_null_rate:.4f}" assert ctr_null_rate <= 0.01, f"ctr null rate too high: {ctr_null_rate:.4f}"

    label_rate = df.filter(col("label") == 1).count() / total ctr_stats = df.select(mean("ctr").alias("ctr_mean"), stddev("ctr").alias("ctr_std")).collect()[0]

    Compare against stored baselines (example, load from a configuration table or file)

    baseline = {"label_rate": 0.5, "ctr_mean": 0.08, "ctr_std": 0.12, "label_rate_tol": 0.05, "ctr_mean_tol": 0.01, "ctr_std_tol": 0.02}

    assert abs(label_rate - baseline["label_rate"]) <= baseline["label_rate_tol"], "Label rate drift"

    assert abs(ctr_stats["ctr_mean"] - baseline["ctr_mean"]) <= baseline["ctr_mean_tol"], "CTR mean drift"

    assert abs(ctr_stats["ctr_std"] - baseline["ctr_std"]) <= baseline["ctr_std_tol"], "CTR std drift"

Notes for productionization:

  • Run these checks as part of your pipeline’s validation stage and fail fast with clear incident logs.
  • Persist metrics (null rates, duplicate rates, distribution stats) to a data quality table for trend analysis.
  • Calibrate thresholds to historical data and business requirements; start with conservative bounds and refine over time.
  • Where duplicates are expected, define and apply deterministic deduplication logic before training.

示例详情

解决的问题

让任何数据团队在提供数据集的关键属性后,立即获得面向该数据的“五大数据质量检查方案”,以清晰、可执行、可评审的结构呈现,便于快速落地。通过专家级视角输出跨语言内容,帮助标准化质量检查流程,减少反复沟通与返工,降低由数据问题带来的业务风险。即刻输入数据集关键信息,快速生成适配的检查清单与行动建议,推动团队从被动排错走向主动预防。

适用用户

数据工程师

在新建或改造数据管道时,快速产出覆盖缺失、异常、重复、一致性、时效性的检查清单,用于预生产验证与发布审批。发生故障时按步骤定位问题,缩短恢复时间。

数据分析师/BI工程师

报表异常或指标波动时,获得清晰的核查路线与取数一致性检查,保证口径稳定。生成易读说明,方便与业务沟通、复现与纠偏。

机器学习工程师

在训练集与特征构建阶段,生成针对样本质量的检查与修复建议,确保数据稳定、提升模型效果。支持多语言说明,便于团队协作。

特征总结

一键生成与数据属性相匹配的质量检查清单,明确规则、指标与可落地步骤。
自动识别缺失、异常、重复与一致性问题,给出可执行的修复建议与验证方法。
支持多语言输出,便于跨团队共享与上报,减少沟通成本,加速问题闭环。
以业务流程为导向组织检查项,覆盖摄取、转换、存储、取数各环节的风险点。
提供结构化报告与结果摘要,帮助快速决策、设定优先级与跟进责任人。
可按场景灵活定制检查深度与侧重点,适配电商、金融、广告等多种业务。
为上线前和管道变更后提供即用巡检清单,降低事故概率,守护关键指标。
轻松形成团队统一的质量标准与术语,提升协作效率,减少返工与扯皮。
结合历史问题快速生成复盘要点,沉淀最佳实践,持续提升数据可信度。
可复用为教学与培训素材,帮助新人快速掌握质量思路与常见排查手法。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥15.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 231 tokens
- 2 个可调节参数
{ 数据集属性 } { 输出语言 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59