总结数据集质量检查的五种方法,提供专业技术建议。
以下为针对数据集 events(字段:id INT, ts TIMESTAMP, src STRING, val DOUBLE;分区:dt)的五类数据质量检查方法。每类包含目标、规则与可执行的示例 SQL(以 Spark SQL/Hive SQL 风格为参考,可按需调整)。 1) 分区与时间戳一致性检查 - 目标:确保分区键 dt 与记录时间 ts 一致,避免跨日落入错误分区。 - 规则示例: - 记录必须满足 to_date(ts) = to_date(dt)。 - 禁止 ts 为空或不合法。 - 示例 SQL: -- 异常记录计数(按分区) SELECT dt, COUNT(*) AS bad_cnt FROM events WHERE ts IS NULL OR to_date(ts) <> to_date(dt) GROUP BY dt; -- 占比监控(可设阈值,如 bad_rate < 0.1%) SELECT dt, SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) AS bad_cnt, COUNT(*) AS total_cnt, SUM(CASE WHEN ts IS NULL OR to_date(ts) <> to_date(dt) THEN 1 ELSE 0 END) / COUNT(*) AS bad_rate FROM events GROUP BY dt; 2) 必填字段完整性检查(非空与可解析性) - 目标:关键字段必须存在且类型可用。 - 规则示例: - id、ts、src 为必填;val 是否必填视业务定义(此处示例为允许为空)。 - 字符串字段去除首尾空白后不得为空串。 - 示例 SQL: SELECT dt, SUM(CASE WHEN id IS NULL THEN 1 ELSE 0 END) AS id_nulls, SUM(CASE WHEN ts IS NULL THEN 1 ELSE 0 END) AS ts_nulls, SUM(CASE WHEN src IS NULL OR trim(src) = '' THEN 1 ELSE 0 END) AS src_nulls FROM events GROUP BY dt; -- 可增加类型可解析性检查(若原始为字符串,需要 CAST 成功) -- 示例:若 ts 原始为 STRING,可用 TRY_CAST/正则预校验(具体函数依引擎而定)。 3) 唯一性/重复记录检查 - 目标:识别重复事件,保障下游聚合与审计的准确性。 - 规则示例: - 定义业务键(需与业务方确认)。常见候选键:(id, ts, src) 或 (id, src) 或 (id, dt, src)。 - 以下以 (id, ts, src) 为示例,可通过参数化配置。 - 示例 SQL: -- 基于业务键的重复行 SELECT dt, id, ts, src, COUNT(*) AS dup_cnt FROM events GROUP BY dt, id, ts, src HAVING COUNT(*) > 1; -- 或对分区级别的重复率快速评估 SELECT dt, COUNT(*) AS total_cnt, COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) AS distinct_cnt, 1 - COUNT(DISTINCT CONCAT_WS('|', CAST(id AS STRING), CAST(ts AS STRING), src)) / COUNT(*) AS dup_rate FROM events GROUP BY dt; 4) 值域与参照完整性检查 - 目标:数值字段处于可接受范围;分类字段在受控字典内。 - 规则示例: - val 数值有效:非 NULL、非 NaN,且在业务阈值范围内(示例:val BETWEEN min_val AND max_val,阈值需与业务确认)。 - src 属于白名单或维表 dim_src(src_code) 中的合法值。 - 示例 SQL: -- val 值域校验(以示例阈值 -1e6 到 1e6) SELECT dt, SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) AS bad_val_cnt, COUNT(*) AS total_cnt, SUM(CASE WHEN val IS NULL OR isnan(val) OR val < -1000000 OR val > 1000000 THEN 1 ELSE 0 END) / COUNT(*) AS bad_val_rate FROM events GROUP BY dt; -- src 参照完整性(找出未知来源) -- 需存在维表 dim_src(src_code) SELECT e.dt, e.src, COUNT(*) AS unknown_src_cnt FROM events e LEFT JOIN dim_src d ON e.src = d.src_code WHERE d.src_code IS NULL GROUP BY e.dt, e.src; 5) 数据新鲜度与体量基线检查 - 目标:确保分区按时到达、行数在合理范围,及时发现数据延迟或大幅波动。 - 规则示例: - 今日分区 should exist by T+0 HH:MM(由调度约定);如未到达则告警。 - 行数与近 7 日基线比对,偏离阈值(如 ±20%)告警。 - 示例 SQL: -- 最近 N 天应到分区检测(以最近 7 天为例;Spark 可用 sequence 生成日期序列) WITH cal AS ( SELECT explode(sequence(date_sub(current_date(), 6), current_date())) AS dt ), arrived AS ( SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt FROM events WHERE to_date(dt) BETWEEN date_sub(current_date(), 6) AND current_date() GROUP BY to_date(dt) ) SELECT c.dt, COALESCE(a.row_cnt, 0) AS row_cnt, CASE WHEN a.row_cnt IS NULL THEN 'MISSING_PARTITION' ELSE 'OK' END AS status FROM cal c LEFT JOIN arrived a USING (dt); -- 今日行数与过去 7 日基线(使用近 7 天 row_cnt 的中位数或近似分位数) WITH hist AS ( SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt FROM events WHERE to_date(dt) BETWEEN date_sub(current_date(), 7) AND date_sub(current_date(), 1) GROUP BY to_date(dt) ), today AS ( SELECT to_date(dt) AS dt, COUNT(*) AS row_cnt FROM events WHERE to_date(dt) = current_date() GROUP BY to_date(dt) ) SELECT t.dt, t.row_cnt AS today_cnt, percentile_approx(h.row_cnt, 0.5) AS p50_last7, CASE WHEN t.row_cnt < 0.8 * percentile_approx(h.row_cnt, 0.5) THEN 'LOW_VOLUME' WHEN t.row_cnt > 1.2 * percentile_approx(h.row_cnt, 0.5) THEN 'HIGH_VOLUME' ELSE 'OK' END AS volume_status FROM today t CROSS JOIN (SELECT collect_list(row_cnt) AS row_cnt FROM hist) h; 实施建议 - 将上述规则参数化(必填字段集合、业务键、val 阈值、src 白名单、偏离阈值、时限等),存入配置表,便于多表复用。 - 以数据质量作业的方式编排(如 Airflow/Argo/Scheduler),输出检查结果到审计表 dq_results,字段包含表名、分区、规则ID、度量值、阈值、状态、运行时间。 - 对失败规则触发告警(Slack/邮件/告警平台),并根据严重度选择阻断下游或允许降级放行。 - 对重复与越界数据可增加自动纠偏策略(如去重写回、隔离至 quarantine 表,附带错误原因)。
Below are five data quality checks for a “paid” dataset derived from table orders with fields: oid (INT), uid (INT), pay_ts (TIMESTAMP), amt (DECIMAL). Each check includes its objective, rule, and an example SQL query (ANSI SQL) applicable to most data warehouses. 1) Paid definition compliance and required-field completeness - Objective: Ensure the dataset strictly adheres to the “paid” definition and contains required values. - Rule: - Include only rows where pay_ts IS NOT NULL. - amt must be positive. - oid and uid must be present (NOT NULL). - Example SQL: SELECT SUM(CASE WHEN pay_ts IS NULL THEN 1 ELSE 0 END) AS violations_missing_pay_ts, SUM(CASE WHEN amt IS NULL OR amt <= 0 THEN 1 ELSE 0 END) AS violations_amt_nonpositive_or_null, SUM(CASE WHEN oid IS NULL THEN 1 ELSE 0 END) AS violations_missing_oid, SUM(CASE WHEN uid IS NULL THEN 1 ELSE 0 END) AS violations_missing_uid FROM orders WHERE pay_ts IS NOT NULL OR pay_ts IS NULL; -- Optional hard filter audit (ensures no unpaid rows leak into “paid” dataset) SELECT COUNT(*) AS unpaid_rows_present FROM orders WHERE pay_ts IS NULL; 2) Primary key uniqueness (duplicate detection) - Objective: Prevent duplicate paid orders for the same oid. - Rule: Each oid appears at most once in the paid dataset. - Example SQL: SELECT oid, COUNT(*) AS dup_count FROM orders WHERE pay_ts IS NOT NULL GROUP BY oid HAVING COUNT(*) > 1; -- If duplicates exist, inspect conflicting values SELECT o.* FROM ( SELECT oid FROM orders WHERE pay_ts IS NOT NULL GROUP BY oid HAVING COUNT(*) > 1 ) d JOIN orders o USING (oid) ORDER BY o.oid, o.pay_ts; 3) Timestamp validity (temporal sanity) - Objective: Validate pay_ts against reasonable temporal bounds to catch clock, timezone, or ingestion errors. - Rule: - pay_ts must not be in the future beyond a small tolerance. - pay_ts must be later than a configured lower bound (e.g., system go‑live date). - Example SQL (using a 5-minute future tolerance and a configurable lower bound): WITH params AS ( SELECT CURRENT_TIMESTAMP AS now_ts, TIMESTAMP '2020-01-01 00:00:00' AS lower_bound_ts ) SELECT COUNT(*) AS invalid_timestamps FROM orders o CROSS JOIN params p WHERE o.pay_ts IS NOT NULL AND (o.pay_ts > p.now_ts + INTERVAL '5' MINUTE OR o.pay_ts < p.lower_bound_ts); 4) Referential integrity of uid (if a user dimension exists) - Objective: Ensure that uid in orders maps to a valid user record. - Rule: For paid rows, uid must exist in the users dimension (or source of truth). - Example SQL: SELECT COUNT(*) AS missing_users FROM orders o LEFT JOIN dim_users u ON o.uid = u.uid WHERE o.pay_ts IS NOT NULL AND u.uid IS NULL; -- If there is a known set of valid uids (e.g., from a lookup table), use that instead of dim_users. 5) Completeness and reconciliation of paid counts/amounts (source vs. curated) - Objective: Detect loss or duplication between source events and the curated paid dataset. - Rule: - For each date, paid counts and sums should reconcile within a defined tolerance against the authoritative source. - Example SQL (assuming a source table orders_src contains raw payment events): WITH src AS ( SELECT CAST(pay_ts AS DATE) AS dt, COUNT(*) AS src_cnt, SUM(amt) AS src_amt FROM orders_src WHERE pay_ts IS NOT NULL GROUP BY CAST(pay_ts AS DATE) ), tgt AS ( SELECT CAST(pay_ts AS DATE) AS dt, COUNT(*) AS tgt_cnt, SUM(amt) AS tgt_amt FROM orders WHERE pay_ts IS NOT NULL GROUP BY CAST(pay_ts AS DATE) ) SELECT s.dt, s.src_cnt, t.tgt_cnt, s.src_amt, t.tgt_amt, (t.tgt_cnt - s.src_cnt) AS diff_cnt, (t.tgt_amt - s.src_amt) AS diff_amt FROM src s JOIN tgt t USING (dt) WHERE ABS(t.tgt_cnt - s.src_cnt) > 0 OR ABS(t.tgt_amt - s.src_amt) > 0; Implementation notes: - If orders_src or dim_users is not available, substitute the appropriate upstream/source-of-truth tables for reconciliation and referential integrity checks. - Thresholds (future tolerance, lower bound dates, amount maxima) should be parameterized and aligned with business SLAs and known system constraints. - These checks can be automated via scheduled jobs or embedded into a data quality framework (e.g., Great Expectations, dbt tests, or Airflow sensors) and should emit metrics to monitoring/alerting systems.
Below are five practical data quality checks tailored for the training dataset clicks with fields: sid (int), label (binary), age (int), last_ts (timestamp), ctr (double). Each check includes what to assert and an example implementation (PySpark) you can embed in a data validation job. 1) Schema and Mandatory Field Conformance - Assertion: - Columns exist with expected types: sid:int, label:int (binary), age:int, last_ts:timestamp, ctr:double. - Mandatory fields are non-null: sid, label, last_ts. - Rationale: Prevents ingestion of malformed records and ensures downstream transformations do not fail due to type or null issues. - Example (PySpark): - Enforce schema on read: schema = StructType([ StructField("sid", IntegerType(), False), StructField("label", IntegerType(), False), StructField("age", IntegerType(), True), StructField("last_ts", TimestampType(), False), StructField("ctr", DoubleType(), True), ]) df = spark.read.schema(schema).parquet(".../clicks") - Null check: violations = df.filter( col("sid").isNull() | col("label").isNull() | col("last_ts").isNull() ).count() assert violations == 0, f"Nulls found in mandatory fields: {violations}" 2) Identifier Uniqueness and Duplicate Detection - Assertion: - If sid is expected to uniquely identify training records, enforce uniqueness (no duplicate sids). - If duplicates can occur (e.g., multiple events per sid), flag the duplicate rate and apply a deterministic dedup rule (e.g., keep the latest last_ts). - Rationale: Duplicate records distort distributions and can bias model training. - Example (PySpark): - Detect duplicates: dup_df = df.groupBy("sid").count().filter(col("count") > 1) dup_rate = dup_df.count() / df.select("sid").distinct().count() assert dup_rate == 0, f"Duplicate sid rate: {dup_rate:.6f}" - Optional dedup (keep most recent): window = Window.partitionBy("sid").orderBy(col("last_ts").desc()) df_dedup = df.withColumn("rn", row_number().over(window)).filter(col("rn") == 1).drop("rn") 3) Domain and Range Constraints - Assertion: - label ∈ {0, 1} - age ∈ [0, 120] (adjust threshold to business context) - ctr ∈ [0.0, 1.0] - last_ts ≤ current time (no future timestamps) - Rationale: Catches impossible or out-of-contract values that indicate upstream defects or parsing issues. - Example (PySpark): from pyspark.sql.functions import current_timestamp invalid = df.filter( (col("label").isin(0, 1) == False) | (col("age").isNotNull() & ( (col("age") < 0) | (col("age") > 120) )) | (col("ctr").isNotNull() & ( (col("ctr") < 0.0) | (col("ctr") > 1.0) )) | (col("last_ts") > current_timestamp()) ) assert invalid.count() == 0, f"Domain/range violations: {invalid.count()}" 4) Timeliness and Coverage (Freshness within Training Window) - Assertion: - The dataset’s max(last_ts) is within the expected SLA window (e.g., within 24 hours of now for daily training). - The dataset covers the intended training period (e.g., min(last_ts) and max(last_ts) span expected window). - Rationale: Ensures the model trains on timely and representative data; stale inputs degrade performance. - Example (PySpark): from pyspark.sql.functions import max as spark_max, min as spark_min import datetime stats = df.agg(spark_min("last_ts").alias("min_ts"), spark_max("last_ts").alias("max_ts")).collect()[0] now = datetime.datetime.utcnow() freshness_ok = (now - stats["max_ts"]).total_seconds() <= 24 * 3600 # 24h SLA assert freshness_ok, f"Data not fresh. max(last_ts)={stats['max_ts']} UTC" # Optional coverage check for a 7-day window coverage_ok = (stats["max_ts"] - stats["min_ts"]).days >= 7 assert coverage_ok, f"Insufficient coverage: {stats['min_ts']}..{stats['max_ts']}" 5) Missingness and Distribution Stability (Drift Detection) - Assertion: - Missingness thresholds: - age null rate ≤ 1% (example) - ctr null rate ≤ 1% (example) - Distribution stability vs. a baseline snapshot (e.g., previous day/week): - label rate within expected band (e.g., 0.3–0.7 if historically balanced) - ctr mean/variance within tolerance - age distribution similarity (use PSI or KS test) - Rationale: Detects upstream shifts and silent failures that maintain schema but change data characteristics. - Example (PySpark): from pyspark.sql.functions import mean, stddev, count, when total = df.count() age_null_rate = df.filter(col("age").isNull()).count() / total ctr_null_rate = df.filter(col("ctr").isNull()).count() / total assert age_null_rate <= 0.01, f"age null rate too high: {age_null_rate:.4f}" assert ctr_null_rate <= 0.01, f"ctr null rate too high: {ctr_null_rate:.4f}" label_rate = df.filter(col("label") == 1).count() / total ctr_stats = df.select(mean("ctr").alias("ctr_mean"), stddev("ctr").alias("ctr_std")).collect()[0] # Compare against stored baselines (example, load from a configuration table or file) # baseline = {"label_rate": 0.5, "ctr_mean": 0.08, "ctr_std": 0.12, "label_rate_tol": 0.05, "ctr_mean_tol": 0.01, "ctr_std_tol": 0.02} # assert abs(label_rate - baseline["label_rate"]) <= baseline["label_rate_tol"], "Label rate drift" # assert abs(ctr_stats["ctr_mean"] - baseline["ctr_mean"]) <= baseline["ctr_mean_tol"], "CTR mean drift" # assert abs(ctr_stats["ctr_std"] - baseline["ctr_std"]) <= baseline["ctr_std_tol"], "CTR std drift" Notes for productionization: - Run these checks as part of your pipeline’s validation stage and fail fast with clear incident logs. - Persist metrics (null rates, duplicate rates, distribution stats) to a data quality table for trend analysis. - Calibrate thresholds to historical data and business requirements; start with conservative bounds and refine over time. - Where duplicates are expected, define and apply deterministic deduplication logic before training.
在新建或改造数据管道时,快速产出覆盖缺失、异常、重复、一致性、时效性的检查清单,用于预生产验证与发布审批。发生故障时按步骤定位问题,缩短恢复时间。
报表异常或指标波动时,获得清晰的核查路线与取数一致性检查,保证口径稳定。生成易读说明,方便与业务沟通、复现与纠偏。
在训练集与特征构建阶段,生成针对样本质量的检查与修复建议,确保数据稳定、提升模型效果。支持多语言说明,便于团队协作。
沉淀标准化巡检模板,建立周期性检查机制与整改清单,覆盖关键系统与表级数据。用于审计留痕与合规报告,降低风险。
埋点上线前后制定事件数据验收步骤,检查口径一致与漏斗完整,避免数据误判导致策略失效。快速形成复盘要点与改进方案。
对活动结算、订单、库存等关键数据进行快速核对,及时发现错误与缺口,减少赔付、投诉与损失。将检查结果用于跨部门协作。
让任何数据团队在提供数据集的关键属性后,立即获得面向该数据的“五大数据质量检查方案”,以清晰、可执行、可评审的结构呈现,便于快速落地。通过专家级视角输出跨语言内容,帮助标准化质量检查流程,减少反复沟通与返工,降低由数据问题带来的业务风险。即刻输入数据集关键信息,快速生成适配的检查清单与行动建议,推动团队从被动排错走向主动预防。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期