数据质量检查方法总结

0 浏览
0 试用
0 购买
Sep 26, 2025更新

总结数据集质量检查的五种方法,提供专业技术建议。

示例1

以下为针对数据集 events(字段:id INT, ts TIMESTAMP, src STRING, val DOUBLE;分区:dt)的五类数据质量检查方法。每类包含目标、规则与可执行的示例 SQL(以 Spark SQL/Hive SQL 风格为参考,可按需调整)。

1) 分区与时间戳一致性检查
- 目标:确保分区键 dt 与记录时间 ts 一致,避免跨日落入错误分区。
- 规则示例:
  - 记录必须满足 to_date(ts) = to_date(dt)。
  - 禁止 ts 为空或不合法。
- 示例 SQL:
  -- 异常记录计数(按分区)
  SELECT dt, COUNT(*) AS bad_cnt
  FROM events
  WHERE ts IS NULL OR to_date(ts) <> to_date(dt)
  GROUP BY dt;

  -- 占比监控(可设阈值,如 bad_rate < 0.1%)
  SELECT dt,
         SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) AS bad_cnt,
         COUNT(*) AS total_cnt,
         SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) / COUNT(*) AS bad_rate
  FROM events
  GROUP BY dt;

2) 必填字段完整性检查(非空与可解析性)
- 目标:关键字段必须存在且类型可用。
- 规则示例:
  - id、ts、src 为必填;val 是否必填视业务定义(此处示例为允许为空)。
  - 字符串字段去除首尾空白后不得为空串。
- 示例 SQL:
  SELECT dt,
         SUM(CASE WHEN id IS NULL THEN 1 ELSE 0 END) AS id_nulls,
         SUM(CASE WHEN ts IS NULL THEN 1 ELSE 0 END) AS ts_nulls,
         SUM(CASE WHEN src IS NULL OR trim(src) = '' THEN 1 ELSE 0 END) AS src_nulls
  FROM events
  GROUP BY dt;

  -- 可增加类型可解析性检查(若原始为字符串,需要 CAST 成功)
  -- 示例:若 ts 原始为 STRING,可用 TRY_CAST/正则预校验(具体函数依引擎而定)。

3) 唯一性/重复记录检查
- 目标:识别重复事件,保障下游聚合与审计的准确性。
- 规则示例:
  - 定义业务键(需与业务方确认)。常见候选键:(id, ts, src) 或 (id, src) 或 (id, dt, src)。
  - 以下以 (id, ts, src) 为示例,可通过参数化配置。
- 示例 SQL:
  -- 基于业务键的重复行
  SELECT dt, id, ts, src, COUNT(*) AS dup_cnt
  FROM events
  GROUP BY dt, id, ts, src
  HAVING COUNT(*) > 1;

  -- 或对分区级别的重复率快速评估
  SELECT dt,
         COUNT(*) AS total_cnt,
         COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) AS distinct_cnt,
         1 - COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) / COUNT(*) AS dup_rate
  FROM events
  GROUP BY dt;

4) 值域与参照完整性检查
- 目标:数值字段处于可接受范围;分类字段在受控字典内。
- 规则示例:
  - val 数值有效:非 NULL、非 NaN,且在业务阈值范围内(示例:val BETWEEN min_val AND max_val,阈值需与业务确认)。
  - src 属于白名单或维表 dim_src(src_code) 中的合法值。
- 示例 SQL:
  -- val 值域校验(以示例阈值 -1e6 到 1e6)
  SELECT dt,
         SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) AS bad_val_cnt,
         COUNT(*) AS total_cnt,
         SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) / COUNT(*) AS bad_val_rate
  FROM events
  GROUP BY dt;

  -- src 参照完整性(找出未知来源)
  -- 需存在维表 dim_src(src_code)
  SELECT e.dt, e.src, COUNT(*) AS unknown_src_cnt
  FROM events e
  LEFT JOIN dim_src d
    ON e.src = d.src_code
  WHERE d.src_code IS NULL
  GROUP BY e.dt, e.src;

5) 数据新鲜度与体量基线检查
- 目标:确保分区按时到达、行数在合理范围,及时发现数据延迟或大幅波动。
- 规则示例:
  - 今日分区 should exist by T+0 HH:MM(由调度约定);如未到达则告警。
  - 行数与近 7 日基线比对,偏离阈值(如 ±20%)告警。
- 示例 SQL:
  -- 最近 N 天应到分区检测(以最近 7 天为例;Spark 可用 sequence 生成日期序列)
  WITH cal AS (
    SELECT explode(sequence(date_sub(current_date(), 6), current_date())) AS dt
  ),
  arrived AS (
    SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt
    FROM events
    WHERE to_date(dt) BETWEEN date_sub(current_date(), 6) AND current_date()
    GROUP BY to_date(dt)
  )
  SELECT c.dt,
         COALESCE(a.row_cnt, 0) AS row_cnt,
         CASE WHEN a.row_cnt IS NULL THEN 'MISSING_PARTITION' ELSE 'OK' END AS status
  FROM cal c
  LEFT JOIN arrived a USING (dt);

  -- 今日行数与过去 7 日基线(使用近 7 天 row_cnt 的中位数或近似分位数)
  WITH hist AS (
    SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt
    FROM events
    WHERE to_date(dt) BETWEEN date_sub(current_date(), 7) AND date_sub(current_date(), 1)
    GROUP BY to_date(dt)
  ),
  today AS (
    SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt
    FROM events
    WHERE to_date(dt) = current_date()
    GROUP BY to_date(dt)
  )
  SELECT t.dt,
         t.row_cnt AS today_cnt,
         percentile_approx(h.row_cnt, 0.5) AS p50_last7,
         CASE
           WHEN t.row_cnt < 0.8 * percentile_approx(h.row_cnt, 0.5) THEN 'LOW_VOLUME'
           WHEN t.row_cnt > 1.2 * percentile_approx(h.row_cnt, 0.5) THEN 'HIGH_VOLUME'
           ELSE 'OK'
         END AS volume_status
  FROM today t
  CROSS JOIN (SELECT collect_list(row_cnt) AS row_cnt FROM hist) h;

实施建议
- 将上述规则参数化(必填字段集合、业务键、val 阈值、src 白名单、偏离阈值、时限等),存入配置表,便于多表复用。
- 以数据质量作业的方式编排(如 Airflow/Argo/Scheduler),输出检查结果到审计表 dq_results,字段包含表名、分区、规则ID、度量值、阈值、状态、运行时间。
- 对失败规则触发告警(Slack/邮件/告警平台),并根据严重度选择阻断下游或允许降级放行。
- 对重复与越界数据可增加自动纠偏策略(如去重写回、隔离至 quarantine 表,附带错误原因)。

示例2

Below are five data quality checks for a “paid” dataset derived from table orders with fields: oid (INT), uid (INT), pay_ts (TIMESTAMP), amt (DECIMAL). Each check includes its objective, rule, and an example SQL query (ANSI SQL) applicable to most data warehouses.

1) Paid definition compliance and required-field completeness
- Objective: Ensure the dataset strictly adheres to the “paid” definition and contains required values.
- Rule:
  - Include only rows where pay_ts IS NOT NULL.
  - amt must be positive.
  - oid and uid must be present (NOT NULL).
- Example SQL:
  SELECT
    SUM(CASE WHEN pay_ts IS NULL THEN 1 ELSE 0 END) AS violations_missing_pay_ts,
    SUM(CASE WHEN amt IS NULL OR amt <= 0 THEN 1 ELSE 0 END) AS violations_amt_nonpositive_or_null,
    SUM(CASE WHEN oid IS NULL THEN 1 ELSE 0 END) AS violations_missing_oid,
    SUM(CASE WHEN uid IS NULL THEN 1 ELSE 0 END) AS violations_missing_uid
  FROM orders
  WHERE pay_ts IS NOT NULL OR pay_ts IS NULL;

  -- Optional hard filter audit (ensures no unpaid rows leak into “paid” dataset)
  SELECT COUNT(*) AS unpaid_rows_present
  FROM orders
  WHERE pay_ts IS NULL;

2) Primary key uniqueness (duplicate detection)
- Objective: Prevent duplicate paid orders for the same oid.
- Rule: Each oid appears at most once in the paid dataset.
- Example SQL:
  SELECT oid, COUNT(*) AS dup_count
  FROM orders
  WHERE pay_ts IS NOT NULL
  GROUP BY oid
  HAVING COUNT(*) > 1;

  -- If duplicates exist, inspect conflicting values
  SELECT o.*
  FROM (
    SELECT oid
    FROM orders
    WHERE pay_ts IS NOT NULL
    GROUP BY oid
    HAVING COUNT(*) > 1
  ) d
  JOIN orders o USING (oid)
  ORDER BY o.oid, o.pay_ts;

3) Timestamp validity (temporal sanity)
- Objective: Validate pay_ts against reasonable temporal bounds to catch clock, timezone, or ingestion errors.
- Rule:
  - pay_ts must not be in the future beyond a small tolerance.
  - pay_ts must be later than a configured lower bound (e.g., system go‑live date).
- Example SQL (using a 5-minute future tolerance and a configurable lower bound):
  WITH params AS (
    SELECT
      CURRENT_TIMESTAMP AS now_ts,
      TIMESTAMP '2020-01-01 00:00:00' AS lower_bound_ts
  )
  SELECT COUNT(*) AS invalid_timestamps
  FROM orders o
  CROSS JOIN params p
  WHERE o.pay_ts IS NOT NULL
    AND (o.pay_ts > p.now_ts + INTERVAL '5' MINUTE OR o.pay_ts < p.lower_bound_ts);

4) Referential integrity of uid (if a user dimension exists)
- Objective: Ensure that uid in orders maps to a valid user record.
- Rule: For paid rows, uid must exist in the users dimension (or source of truth).
- Example SQL:
  SELECT COUNT(*) AS missing_users
  FROM orders o
  LEFT JOIN dim_users u ON o.uid = u.uid
  WHERE o.pay_ts IS NOT NULL
    AND u.uid IS NULL;

  -- If there is a known set of valid uids (e.g., from a lookup table), use that instead of dim_users.

5) Completeness and reconciliation of paid counts/amounts (source vs. curated)
- Objective: Detect loss or duplication between source events and the curated paid dataset.
- Rule:
  - For each date, paid counts and sums should reconcile within a defined tolerance against the authoritative source.
- Example SQL (assuming a source table orders_src contains raw payment events):
  WITH src AS (
    SELECT CAST(pay_ts AS DATE) AS dt,
           COUNT(*) AS src_cnt,
           SUM(amt) AS src_amt
    FROM orders_src
    WHERE pay_ts IS NOT NULL
    GROUP BY CAST(pay_ts AS DATE)
  ),
  tgt AS (
    SELECT CAST(pay_ts AS DATE) AS dt,
           COUNT(*) AS tgt_cnt,
           SUM(amt) AS tgt_amt
    FROM orders
    WHERE pay_ts IS NOT NULL
    GROUP BY CAST(pay_ts AS DATE)
  )
  SELECT s.dt,
         s.src_cnt, t.tgt_cnt,
         s.src_amt, t.tgt_amt,
         (t.tgt_cnt - s.src_cnt) AS diff_cnt,
         (t.tgt_amt - s.src_amt) AS diff_amt
  FROM src s
  JOIN tgt t USING (dt)
  WHERE ABS(t.tgt_cnt - s.src_cnt) > 0
     OR ABS(t.tgt_amt - s.src_amt) > 0;

Implementation notes:
- If orders_src or dim_users is not available, substitute the appropriate upstream/source-of-truth tables for reconciliation and referential integrity checks.
- Thresholds (future tolerance, lower bound dates, amount maxima) should be parameterized and aligned with business SLAs and known system constraints.
- These checks can be automated via scheduled jobs or embedded into a data quality framework (e.g., Great Expectations, dbt tests, or Airflow sensors) and should emit metrics to monitoring/alerting systems.

示例3

Below are five practical data quality checks tailored for the training dataset clicks with fields: sid (int), label (binary), age (int), last_ts (timestamp), ctr (double). Each check includes what to assert and an example implementation (PySpark) you can embed in a data validation job.

1) Schema and Mandatory Field Conformance
- Assertion:
  - Columns exist with expected types: sid:int, label:int (binary), age:int, last_ts:timestamp, ctr:double.
  - Mandatory fields are non-null: sid, label, last_ts.
- Rationale: Prevents ingestion of malformed records and ensures downstream transformations do not fail due to type or null issues.
- Example (PySpark):
  - Enforce schema on read:
    schema = StructType([
      StructField("sid", IntegerType(), False),
      StructField("label", IntegerType(), False),
      StructField("age", IntegerType(), True),
      StructField("last_ts", TimestampType(), False),
      StructField("ctr", DoubleType(), True),
    ])
    df = spark.read.schema(schema).parquet(".../clicks")
  - Null check:
    violations = df.filter(
      col("sid").isNull() | col("label").isNull() | col("last_ts").isNull()
    ).count()
    assert violations == 0, f"Nulls found in mandatory fields: {violations}"

2) Identifier Uniqueness and Duplicate Detection
- Assertion:
  - If sid is expected to uniquely identify training records, enforce uniqueness (no duplicate sids).
  - If duplicates can occur (e.g., multiple events per sid), flag the duplicate rate and apply a deterministic dedup rule (e.g., keep the latest last_ts).
- Rationale: Duplicate records distort distributions and can bias model training.
- Example (PySpark):
  - Detect duplicates:
    dup_df = df.groupBy("sid").count().filter(col("count") > 1)
    dup_rate = dup_df.count() / df.select("sid").distinct().count()
    assert dup_rate == 0, f"Duplicate sid rate: {dup_rate:.6f}"
  - Optional dedup (keep most recent):
    window = Window.partitionBy("sid").orderBy(col("last_ts").desc())
    df_dedup = df.withColumn("rn", row_number().over(window)).filter(col("rn") == 1).drop("rn")

3) Domain and Range Constraints
- Assertion:
  - label ∈ {0, 1}
  - age ∈ [0, 120] (adjust threshold to business context)
  - ctr ∈ [0.0, 1.0]
  - last_ts ≤ current time (no future timestamps)
- Rationale: Catches impossible or out-of-contract values that indicate upstream defects or parsing issues.
- Example (PySpark):
    from pyspark.sql.functions import current_timestamp

    invalid = df.filter(
      (col("label").isin(0, 1) == False) |
      (col("age").isNotNull() & ( (col("age") < 0) | (col("age") > 120) )) |
      (col("ctr").isNotNull() & ( (col("ctr") < 0.0) | (col("ctr") > 1.0) )) |
      (col("last_ts") > current_timestamp())
    )
    assert invalid.count() == 0, f"Domain/range violations: {invalid.count()}"

4) Timeliness and Coverage (Freshness within Training Window)
- Assertion:
  - The dataset’s max(last_ts) is within the expected SLA window (e.g., within 24 hours of now for daily training).
  - The dataset covers the intended training period (e.g., min(last_ts) and max(last_ts) span expected window).
- Rationale: Ensures the model trains on timely and representative data; stale inputs degrade performance.
- Example (PySpark):
    from pyspark.sql.functions import max as spark_max, min as spark_min
    import datetime

    stats = df.agg(spark_min("last_ts").alias("min_ts"), spark_max("last_ts").alias("max_ts")).collect()[0]
    now = datetime.datetime.utcnow()
    freshness_ok = (now - stats["max_ts"]).total_seconds() <= 24 * 3600  # 24h SLA
    assert freshness_ok, f"Data not fresh. max(last_ts)={stats['max_ts']} UTC"

    # Optional coverage check for a 7-day window
    coverage_ok = (stats["max_ts"] - stats["min_ts"]).days >= 7
    assert coverage_ok, f"Insufficient coverage: {stats['min_ts']}..{stats['max_ts']}"

5) Missingness and Distribution Stability (Drift Detection)
- Assertion:
  - Missingness thresholds:
    - age null rate ≤ 1% (example)
    - ctr null rate ≤ 1% (example)
  - Distribution stability vs. a baseline snapshot (e.g., previous day/week):
    - label rate within expected band (e.g., 0.3–0.7 if historically balanced)
    - ctr mean/variance within tolerance
    - age distribution similarity (use PSI or KS test)
- Rationale: Detects upstream shifts and silent failures that maintain schema but change data characteristics.
- Example (PySpark):
    from pyspark.sql.functions import mean, stddev, count, when

    total = df.count()
    age_null_rate = df.filter(col("age").isNull()).count() / total
    ctr_null_rate = df.filter(col("ctr").isNull()).count() / total
    assert age_null_rate <= 0.01, f"age null rate too high: {age_null_rate:.4f}"
    assert ctr_null_rate <= 0.01, f"ctr null rate too high: {ctr_null_rate:.4f}"

    label_rate = df.filter(col("label") == 1).count() / total
    ctr_stats = df.select(mean("ctr").alias("ctr_mean"), stddev("ctr").alias("ctr_std")).collect()[0]

    # Compare against stored baselines (example, load from a configuration table or file)
    # baseline = {"label_rate": 0.5, "ctr_mean": 0.08, "ctr_std": 0.12, "label_rate_tol": 0.05, "ctr_mean_tol": 0.01, "ctr_std_tol": 0.02}
    # assert abs(label_rate - baseline["label_rate"]) <= baseline["label_rate_tol"], "Label rate drift"
    # assert abs(ctr_stats["ctr_mean"] - baseline["ctr_mean"]) <= baseline["ctr_mean_tol"], "CTR mean drift"
    # assert abs(ctr_stats["ctr_std"] - baseline["ctr_std"]) <= baseline["ctr_std_tol"], "CTR std drift"

Notes for productionization:
- Run these checks as part of your pipeline’s validation stage and fail fast with clear incident logs.
- Persist metrics (null rates, duplicate rates, distribution stats) to a data quality table for trend analysis.
- Calibrate thresholds to historical data and business requirements; start with conservative bounds and refine over time.
- Where duplicates are expected, define and apply deterministic deduplication logic before training.

适用用户

数据工程师

在新建或改造数据管道时,快速产出覆盖缺失、异常、重复、一致性、时效性的检查清单,用于预生产验证与发布审批。发生故障时按步骤定位问题,缩短恢复时间。

数据分析师/BI工程师

报表异常或指标波动时,获得清晰的核查路线与取数一致性检查,保证口径稳定。生成易读说明,方便与业务沟通、复现与纠偏。

机器学习工程师

在训练集与特征构建阶段,生成针对样本质量的检查与修复建议,确保数据稳定、提升模型效果。支持多语言说明,便于团队协作。

数据治理负责人/合规审计

沉淀标准化巡检模板,建立周期性检查机制与整改清单,覆盖关键系统与表级数据。用于审计留痕与合规报告,降低风险。

产品经理/增长团队

埋点上线前后制定事件数据验收步骤,检查口径一致与漏斗完整,避免数据误判导致策略失效。快速形成复盘要点与改进方案。

运营经理

对活动结算、订单、库存等关键数据进行快速核对,及时发现错误与缺口,减少赔付、投诉与损失。将检查结果用于跨部门协作。

解决的问题

让任何数据团队在提供数据集的关键属性后,立即获得面向该数据的“五大数据质量检查方案”,以清晰、可执行、可评审的结构呈现,便于快速落地。通过专家级视角输出跨语言内容,帮助标准化质量检查流程,减少反复沟通与返工,降低由数据问题带来的业务风险。即刻输入数据集关键信息,快速生成适配的检查清单与行动建议,推动团队从被动排错走向主动预防。

特征总结

一键生成与数据属性相匹配的质量检查清单,明确规则、指标与可落地步骤。
自动识别缺失、异常、重复与一致性问题,给出可执行的修复建议与验证方法。
支持多语言输出,便于跨团队共享与上报,减少沟通成本,加速问题闭环。
以业务流程为导向组织检查项,覆盖摄取、转换、存储、取数各环节的风险点。
提供结构化报告与结果摘要,帮助快速决策、设定优先级与跟进责任人。
可按场景灵活定制检查深度与侧重点,适配电商、金融、广告等多种业务。
为上线前和管道变更后提供即用巡检清单,降低事故概率,守护关键指标。
轻松形成团队统一的质量标准与术语,提升协作效率,减少返工与扯皮。
结合历史问题快速生成复盘要点,沉淀最佳实践,持续提升数据可信度。
可复用为教学与培训素材,帮助新人快速掌握质量思路与常见排查手法。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

¥15.00元
平台提供免费试用机制,
确保效果符合预期,再付费购买!

您购买后可以获得什么

获得完整提示词模板
- 共 231 tokens
- 2 个可调节参数
{ 数据集属性 } { 输出语言 }
自动加入"我的提示词库"
- 获得提示词优化器支持
- 版本化管理支持
获得社区共享的应用案例
限时免费

不要错过!

免费获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59