¥
立即购买

数据列验证规则生成器

361 浏览
35 试用
9 购买
Dec 18, 2025更新

为指定数据表的特定列,生成专业、可执行的数据验证规则。适用于数据清洗、ETL管道构建、数据质量监控等场景,通过自定义验证类型与详细说明,确保数据的一致性与准确性,提升数据工程效率。

主题:fact_orders.order_date 日期格式校验规则与检查步骤(格式校验)

一、验证规则(严格执行)

  • 允许格式:仅 YYYY-MM-DD,必须包含前导零(如 2024-07-01)。
  • 字符限制:仅数字与连字符,禁止时间成分、时区、斜杠、点号等(如 2024/07/01、2024.07.01、2024-07-01T12:00 均判定为违规)。
  • 日历有效性:必须为有效公历日期,正确处理闰年(如 2024-02-29 合法)。
  • 合法区间:日期在 [2000-01-01, 2099-12-31] 且不晚于批处理 T 日。
  • 空值与占位符:不允许 NULL、全空白、“0000-00-00”、占位符(N/A、NA、NULL、null、-、— 等),首尾空格需先 trim 再判断。
  • 正则初筛:^\d{4}-\d{2}-\d{2}$

二、检查流程与判定逻辑 步骤顺序(先 trim 再校验),并输出违规类型 violation_type:

  1. 为空/占位符检查:
    • 为空或空白:NULL_OR_BLANK
    • 占位符(大小写不敏感):PLACEHOLDER(N/A、NA、NULL、-、— 等)
  2. 正则初筛(A):不匹配 ^\d{4}-\d{2}-\d{2}$ → INVALID_FORMAT
  3. 日历有效性(B):按 yyyy-MM-dd 转换为日期类型,解析失败 → INVALID_CALENDAR_DATE
  4. 合法区间(C-1):不在 [2000-01-01, 2099-12-31] → OUT_OF_RANGE
  5. 不晚于处理日(C-2):解析日期 > 批处理 T 日 → FUTURE_DATE
  6. 通过以上全部检查为合法记录;否则按首次命中的违规类型记录。

三、统计输出(D)

  • total_rows:总行数
  • bad_rows:违规行数
  • bad_ratio:违规占比 = bad_rows / total_rows
  • samples:违规样例(order_id, 原始值, 违规类型,最多 100 条)

四、异常处理与阻断策略

  • 违规落地:写入表 bad_records.fact_orders_order_date,字段
    • order_id, order_date_raw(原始值,trim 前), violation_type, detected_at(检测批次时间戳)
  • 阻断策略:
    • bad_ratio > 0.1%:阻断下游分区装载并通知值班
    • 否则:继续发布但告警
  • 修复建议:
    • 入湖/入仓前统一日期标准并补齐前导零;历史脏数据回补分区键,避免分区错乱与口径偏差。

五、示例代码(可直接用于离线批处理)

A. PySpark 实现(推荐用于数据湖/仓常规批校验) 说明:

  • 支持字符串或 date/timestamp 类型的 order_date;统一转字符串后校验。
  • 通过 to_date 严格解析日历有效性;范围和 T 日均为参数化输入。
  • 输出统计与样例;写入 bad_records 并执行阻断。

from pyspark.sql import functions as F, types as T

参数

SOURCE_TABLE = "fact_orders" KEY_COL = "order_id" DATE_COL = "order_date" # 可能是 STRING/DATE/TIMESTAMP BAD_TABLE = "bad_records.fact_orders_order_date"

批处理 T 日(处理日),建议由调度注入,如 2024-07-02 校验不晚于此日

PROCESSING_DAY = F.to_date(F.lit(spark.conf.get("pipeline.run_date", "")), "yyyy-MM-dd")

若未注入,则使用当前日期

PROCESSING_DAY = F.coalesce(PROCESSING_DAY, F.current_date()) BATCH_TS = F.current_timestamp()

读取源数据

df = spark.table(SOURCE_TABLE).select(KEY_COL, DATE_COL)

统一为字符串并 trim

df1 = ( df .withColumn("order_date_raw", F.col(DATE_COL).cast("string")) .withColumn("order_date_trim", F.trim(F.col("order_date_raw"))) )

规则集合

regex = r'^\d{4}-\d{2}-\d{2}$' placeholders = F.lower(F.col("order_date_trim")).isin("n/a", "na", "null", "none", "-", "—") is_blank = (F.col("order_date_trim").isNull()) | (F.col("order_date_trim") == "") regex_ok = F.col("order_date_trim").rlike(regex)

严格日期解析(无效将得到 null)

parsed_date = F.to_date(F.col("order_date_trim"), "yyyy-MM-dd") in_range = (parsed_date >= F.to_date(F.lit("2000-01-01"))) & (parsed_date <= F.to_date(F.lit("2099-12-31"))) not_future = parsed_date <= PROCESSING_DAY

违规类型判定(按顺序)

violation_type = ( F.when(is_blank, F.lit("NULL_OR_BLANK")) .when(placeholders, F.lit("PLACEHOLDER")) .when(~regex_ok, F.lit("INVALID_FORMAT")) .when(parsed_date.isNull(), F.lit("INVALID_CALENDAR_DATE")) .when(~in_range, F.lit("OUT_OF_RANGE")) .when(~not_future, F.lit("FUTURE_DATE")) )

df2 = df1.withColumn("violation_type", violation_type)

bad_df = df2.where(F.col("violation_type").isNotNull())
.select( F.col(KEY_COL).alias("order_id"), F.col("order_date_raw"), F.col("violation_type"), BATCH_TS.alias("detected_at") )

good_df = df2.where(F.col("violation_type").isNull())

统计

total_rows = df2.count() bad_rows = bad_df.count() bad_ratio = (bad_rows / total_rows) if total_rows > 0 else 0.0

落地违规记录(建议建为外表或 Delta/Apache Iceberg/Hive 管理表)

先建表(见下方 DDL),此处直接追加

if bad_rows > 0: bad_df.write.mode("append").format("parquet").saveAsTable(BAD_TABLE)

样例(限 100 条)

samples_df = bad_df.limit(100)

输出统计(可改为写入质量指标表或推送到告警系统)

print({ "table": SOURCE_TABLE, "column": DATE_COL, "total_rows": total_rows, "bad_rows": bad_rows, "bad_ratio": bad_ratio }) samples = samples_df.toPandas() print(samples)

阻断阈值:0.1%

if bad_ratio > 0.001: raise RuntimeError(f"[BLOCKED] {SOURCE_TABLE}.{DATE_COL} bad_ratio={bad_ratio:.6%} > 0.1%")

B. Spark SQL(仅 SQL 作业或调度中使用) -- 参数 -- set pipeline.run_date='2024-07-02';

WITH src AS ( SELECT order_id, CAST(order_date AS STRING) AS order_date_raw, TRIM(CAST(order_date AS STRING)) AS order_date_trim FROM fact_orders ), marked AS ( SELECT order_id, order_date_raw, order_date_trim, CASE WHEN order_date_trim IS NULL OR order_date_trim = '' THEN 'NULL_OR_BLANK' WHEN lower(order_date_trim) IN ('n/a','na','null','none','-','—') THEN 'PLACEHOLDER' WHEN order_date_trim !~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN 'INVALID_FORMAT' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') IS NULL THEN 'INVALID_CALENDAR_DATE' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') < DATE '2000-01-01' OR TO_DATE(order_date_trim, 'yyyy-MM-dd') > DATE '2099-12-31' THEN 'OUT_OF_RANGE' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') > COALESCE(TO_DATE('${pipeline.run_date}'), current_date()) THEN 'FUTURE_DATE' ELSE NULL END AS violation_type FROM src ), bad AS ( SELECT order_id, order_date_raw, violation_type, current_timestamp() AS detected_at FROM marked WHERE violation_type IS NOT NULL ), stat AS ( SELECT (SELECT COUNT() FROM marked) AS total_rows, (SELECT COUNT() FROM bad) AS bad_rows ) -- 1) 落地 bad_records INSERT INTO bad_records.fact_orders_order_date SELECT * FROM bad;

-- 2) 输出统计 SELECT total_rows, bad_rows, bad_rows / total_rows AS bad_ratio FROM stat;

-- 3) 样例(限 100) SELECT order_id, order_date_raw, violation_type FROM bad LIMIT 100;

-- 4) 阻断(若调度系统支持基于查询结果的阈值判断,可用上面的 bad_ratio 触发失败)

说明:正则操作符在 Spark SQL 可用 RLIKE;上例使用 !~ 可能在部分方言不可用,若不支持请改为 NOT (order_date_trim RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')。

C. 违规落地表 DDL(Hive/Spark 兼容) CREATE SCHEMA IF NOT EXISTS bad_records;

CREATE TABLE IF NOT EXISTS bad_records.fact_orders_order_date ( order_id STRING, order_date_raw STRING, violation_type STRING, detected_at TIMESTAMP ) USING PARQUET;

-- 如需分区(建议按检测日期) -- PARTITIONED BY (dt STRING) 并在写入时加 dt=DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd')

D. 存储层 CHECK 约束(若引擎支持) 目标:尽量在存储层阻止违规数据入表(动态“不得晚于 T 日”不适用于静态约束)。

  • Delta Lake(Databricks/开源 Delta,CHECK 在写时强制执行) -- order_date 为 STRING 列时,格式 + 静态范围 ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK (order_date RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$');

    ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (to_date(order_date, 'yyyy-MM-dd') >= DATE '2000-01-01' AND to_date(order_date, 'yyyy-MM-dd') <= DATE '2099-12-31');

    -- 若将字段类型定为 DATE,更稳妥(避免格式问题),则仅需范围约束 -- ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (order_date BETWEEN DATE '2000-01-01' AND DATE '2099-12-31');

  • PostgreSQL(约束强制,推荐把字段类型定为 DATE) -- 若列类型为 DATE(推荐) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (order_date >= DATE '2000-01-01' AND order_date <= DATE '2099-12-31');

    -- 若列为 TEXT,需同时校验格式与可转日期(CAST 失败会阻止写入) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK (order_date ~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'); ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_cast_and_range CHECK (order_date::date >= DATE '2000-01-01' AND order_date::date <= DATE '2099-12-31');

  • ClickHouse(CONSTRAINT 在插入时强制) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK match(order_date, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$');

    ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK parseDateTimeBestEffortOrNull(order_date) >= toDate('2000-01-01') AND parseDateTimeBestEffortOrNull(order_date) <= toDate('2099-12-31');

备注:

  • “不得晚于 T 日”是运行时动态条件,不适合作为静态 CHECK 约束。请在调度校验中实现(如上 PySpark/SQL)。
  • 若将字段类型统一为 DATE,可从根本上消除格式混杂的风险;外部交换/导出再统一格式化为 yyyy-MM-dd。

六、实施与运维建议

  • 字段定义:优先使用 DATE 类型存储;外部接口按 ISO 8601 yyyy-MM-dd 格式输出/接收。
  • 调度巡检:每日 T+0 在分区装载前执行本校验作业;按 0.1% 阈值阻断。
  • 数据字典:将上述规则、样例合法值与常见非法样例收录至数据字典,并在上游系统对接文档中明确。
  • 入湖预处理:对来源多格式日期,ETL/CDC 入湖前统一转换并补齐前导零;对历史数据离线回补修复并重算分区键。
  • 监控与告警:将 bad_ratio、bad_rows 指标接入告警平台,按批次维度留存(可落地 data_quality.metrics 表)。

以下为表 fact_orders 的列 order_status 的“枚举值校验”可执行规则与检查步骤(含示例代码)。目标是确保该字段仅取自统一状态集,规范大小写与空白,避免未知占位与同义异写,支撑履约、退款与收入确认的一致性。

一、校验规则(可操作定义)

  • 允许值全集(去首尾空格后、统一大写判定): {PENDING, PAID, SHIPPED, DELIVERED, CANCELLED, REFUNDED, FAILED_PAYMENT}
  • 规则细则:
    1. 预处理:对原值执行 trim 和 upper,最终标准化为大写存储。
    2. 非法值:空值、空串、仅空白、NULL 字符串、未知占位(N/A、NA、NONE、-、UNKNOWN 等)均判定违规。
    3. 枚举校验:预处理后值不在允许集合即为违规。
    4. 维表约束:建立维表 dim_order_status 存放枚举与释义;入仓时进行字典对齐并仅允许维表内值写入事实表。
    5. 建议规范化管控:全链路统一使用标准化函数 normalize_status() 输出最终持久化值。

二、检查要点与统计输出

  • A) 预处理:trim + upper,剔除无意义占位。
  • B) 枚举校验:not in 列表即违规。
  • C) 统计输出:
    • 各枚举值分布(计数与占比)。
    • 未知值 TopN(原始值及其计数)。
    • 违规总数与占比(违规计数/总计数)。

三、异常处理与自动修复策略

  • 违规落库:将违规记录入库至 bad_records.fact_orders_order_status,字段建议包含: order_id, raw_order_status, normalized_status, violation_reason, recommended_mapping, source_system, etl_batch_id, processed_at。
  • 推荐映射:若能识别常见拼写或变体(如 Paid、paid、paid 、shiped、shipped ),给出 recommended_mapping。
  • 自动修复门槛:
    • 低风险(违规占比 ≤ 0.05% 且为明显小写/空格/常见拼写):在入仓修正为标准值,并记录修正日志。
    • 高风险(> 0.05% 或无法可靠映射):维持为未知分组,不写入事实表目标列;需业务确认后回填。

四、实施建议

  • 在数据接入层引入“字典映射组件”(基于维表与同义词表),在落仓前统一标准化。
  • 在报表层对未知状态设置兜底分组“UNKNOWN_STATUS”,并实时展示修复指引(TopN 未知值与建议映射)。
  • 与业务状态机和事件流对齐,设置状态跃迁校验(例如禁止从 DELIVERED 回退至 PAID)。

五、示例代码

  1. 维表与同义词表(ANSI 风格 SQL) -- 维表:标准字典 CREATE TABLE IF NOT EXISTS dim_order_status ( status_code VARCHAR(32) PRIMARY KEY, status_desc VARCHAR(256) );

MERGE INTO dim_order_status d USING ( SELECT 'PENDING' AS c, '待支付/待处理' AS s UNION ALL SELECT 'PAID', '已支付' UNION ALL SELECT 'SHIPPED', '已发货' UNION ALL SELECT 'DELIVERED', '已送达/签收' UNION ALL SELECT 'CANCELLED', '已取消' UNION ALL SELECT 'REFUNDED', '已退款' UNION ALL SELECT 'FAILED_PAYMENT', '支付失败' ) src ON d.status_code = src.c WHEN NOT MATCHED THEN INSERT (status_code, status_desc) VALUES (src.c, src.s);

-- 可选:同义词/常见拼写映射表(优先精确/正则,避免过度泛化) CREATE TABLE IF NOT EXISTS ref_order_status_synonyms ( pattern VARCHAR(64), -- 匹配模式(存放大写+去空格的形态) mapped_to VARCHAR(32), -- 目标标准值 match_type VARCHAR(16) -- EXACT 或 REGEX );

INSERT INTO ref_order_status_synonyms (pattern, mapped_to, match_type) VALUES ('PAID', 'PAID', 'EXACT'), ('PENDING', 'PENDING', 'EXACT'), ('SHIPPED', 'SHIPPED', 'EXACT'), ('DELIVERED', 'DELIVERED', 'EXACT'), ('CANCELLED', 'CANCELLED', 'EXACT'), ('REFUNDED', 'REFUNDED', 'EXACT'), ('FAILEDPAYMENT', 'FAILED_PAYMENT', 'EXACT'), -- 常见变体(存放去空格+大写后形态) ('PAID', 'PAID', 'EXACT'), ('PAYED', 'PAID', 'EXACT'), ('SHIPED', 'SHIPPED', 'EXACT'), ('SHIPPEDD', 'SHIPPED', 'EXACT'), ('CANCELED', 'CANCELLED', 'EXACT'), ('FAILED_PAYMENT', 'FAILED_PAYMENT', 'EXACT');

  1. 预处理与枚举校验(ANSI 风格 SQL) -- 参数::batch_id 可替换为实际批次号 WITH pre AS ( SELECT o., TRIM(o.order_status) AS trimmed_status, UPPER(TRIM(o.order_status)) AS upper_status, REPLACE(UPPER(TRIM(o.order_status)),' ','') AS compact_status, CASE WHEN o.order_status IS NULL THEN 'NULL' WHEN TRIM(o.order_status) = '' THEN 'EMPTY' WHEN UPPER(TRIM(o.order_status)) IN ('NULL','N/A','NA','NONE','-','UNKNOWN') THEN 'PLACEHOLDER' ELSE NULL END AS nullish_reason FROM fact_orders o ), norm AS ( -- 基于同义词进行一次映射;未命中则保留 upper_status SELECT p., COALESCE(s.mapped_to, p.upper_status) AS normalized_status FROM pre p LEFT JOIN ref_order_status_synonyms s ON p.compact_status = s.pattern AND s.match_type = 'EXACT' ), chk AS ( SELECT n., CASE WHEN nullish_reason IS NOT NULL THEN 'NULL_OR_PLACEHOLDER' WHEN d.status_code IS NULL THEN 'NOT_IN_ENUM' ELSE NULL END AS violation_reason FROM norm n LEFT JOIN dim_order_status d ON n.normalized_status = d.status_code ) -- 统计输出 SELECT SUM(CASE WHEN violation_reason IS NULL THEN 1 ELSE 0 END) AS valid_cnt, SUM(CASE WHEN violation_reason IS NOT NULL THEN 1 ELSE 0 END) AS invalid_cnt, COUNT() AS total_cnt, ROUND(100.0 * SUM(CASE WHEN violation_reason IS NOT NULL THEN 1 ELSE 0 END) / NULLIF(COUNT(*),0), 4) AS invalid_pct FROM chk;

-- 有效值分布 SELECT normalized_status AS status_code, COUNT(*) AS cnt FROM chk WHERE violation_reason IS NULL GROUP BY normalized_status ORDER BY cnt DESC;

-- 未知/违规 TopN SELECT COALESCE(upper_status, '(NULL)') AS raw_upper_status, COUNT(*) AS cnt, MIN(violation_reason) AS reason FROM chk WHERE violation_reason IS NOT NULL GROUP BY COALESCE(upper_status, '(NULL)') ORDER BY cnt DESC FETCH FIRST 50 ROWS WITH TIES;

  1. 违规落库与推荐映射(ANSI 风格 SQL) -- 建议先建宽表用于审计 CREATE TABLE IF NOT EXISTS bad_records.fact_orders_order_status ( order_id VARCHAR(128), raw_order_status VARCHAR(256), normalized_status VARCHAR(64), violation_reason VARCHAR(64), recommended_mapping VARCHAR(64), source_system VARCHAR(128), etl_batch_id VARCHAR(64), processed_at TIMESTAMP );

-- 插入违规记录(recommended_mapping:若 enum 中不存在且同义词表命中则给出;否则 NULL) INSERT INTO bad_records.fact_orders_order_status SELECT c.order_id, c.order_status AS raw_order_status, c.normalized_status, c.violation_reason, CASE WHEN c.violation_reason = 'NOT_IN_ENUM' THEN rs2.mapped_to ELSE NULL END AS recommended_mapping, c.source_system, :batch_id AS etl_batch_id, CURRENT_TIMESTAMP AS processed_at FROM chk c LEFT JOIN ref_order_status_synonyms rs2 ON c.compact_status = rs2.pattern AND rs2.match_type = 'EXACT' WHERE c.violation_reason IS NOT NULL;

  1. 低风险自动修复(只更正大小写/空格/已知常见拼写,且当整体违规占比 ≤ 0.05%) -- 计算违规占比 WITH stats AS ( SELECT SUM(CASE WHEN violation_reason IS NOT NULL THEN 1 ELSE 0 END) AS invalid_cnt, COUNT(*) AS total_cnt FROM chk ) -- 示例:仅当门槛满足时,对可安全映射的数据进行就地更新(请根据目标仓库是否允许 UPDATE 调整实现) UPDATE fact_orders t SET order_status = c.normalized_status FROM chk c, stats s WHERE t.order_id = c.order_id AND c.violation_reason IN ('NOT_IN_ENUM') -- 仅处理可映射的轻微问题 AND EXISTS (SELECT 1 FROM dim_order_status d WHERE d.status_code = c.normalized_status) AND (1.0 * s.invalid_cnt / NULLIF(s.total_cnt, 0)) <= 0.0005;

-- 无法安全修复的记录保留在 bad_records,待业务确认后回填

  1. 数据入仓的统一标准化函数(PySpark,批/流皆可) from pyspark.sql import functions as F, types as T

ALLOWED = {"PENDING","PAID","SHIPPED","DELIVERED","CANCELLED","REFUNDED","FAILED_PAYMENT"} PLACEHOLDERS = {"NULL","N/A","NA","NONE","-","UNKNOWN"}

COMMON_MAP = { "PAYED": "PAID", "SHIPED": "SHIPPED", "SHIPPEDD": "SHIPPED", "CANCELED": "CANCELLED", "FAILED PAYMENT": "FAILED_PAYMENT", "FAILEDPAYMENT": "FAILED_PAYMENT", }

@F.udf("string") def normalize_status(s): if s is None: return None t = s.strip() if t == "": return None up = t.upper() if up in PLACEHOLDERS: return None compact = up.replace(" ", "") if compact in ALLOWED: return compact if compact in COMMON_MAP: return COMMON_MAP[compact] # 保守策略:不做模糊修复,避免误判;如需,可增加 levenshtein <= 1 的映射白名单 return None

def with_status_validation(df): df1 = df.withColumn("normalized_status", normalize_status(F.col("order_status"))) df_valid = df1.filter(F.col("normalized_status").isin(list(ALLOWED))) df_invalid = df1.filter(~F.col("normalized_status").isin(list(ALLOWED))) return df_valid, df_invalid

使用示例

df_orders: 输入 DataFrame,包含 order_id, order_status, source_system 等

df_valid, df_invalid = with_status_validation(df_orders)

统计

df_stats = df1.groupBy("normalized_status").count()

落库违规

df_invalid
.withColumn("violation_reason", F.when(F.col("normalized_status").isNull(), F.lit("NULL_OR_PLACEHOLDER")) .otherwise(F.lit("NOT_IN_ENUM")))
.withColumn("etl_batch_id", F.lit(batch_id))
.withColumn("processed_at", F.current_timestamp())
.write.mode("append").insertInto("bad_records.fact_orders_order_status")

将 df_valid 写入事实表(注意仅写 normalized_status)

df_valid.select( "order_id", F.col("normalized_status").alias("order_status"), "source_system", # 其他业务列... ).write.mode("append").insertInto("fact_orders")

  1. dbt 通用测试(可复用的 generic test) -- models/schema.yml version: 2 models:
  • name: fact_orders columns:
    • name: order_status tests:
      • not_null
      • accepted_values: values: ['PENDING','PAID','SHIPPED','DELIVERED','CANCELLED','REFUNDED','FAILED_PAYMENT']

-- 若需标准化,可在 staging 层构建视图:upper(trim(order_status)) as order_status,然后在该视图上做 accepted_values

六、部署与运行顺序建议

  1. 在接入层或 Staging 层执行 normalize(trim + upper + 同义词映射),并产出 normalized_status。
  2. 用维表 dim_order_status 做枚举校验与维度对齐;禁止非维表值进入事实表最终列。
  3. 生成统计输出(分布、未知 TopN、违规率)。
  4. 将违规数据落库 bad_records.fact_orders_order_status,并带上推荐映射。
  5. 若违规率 ≤ 0.05%,仅执行安全自动修复(小写、空格、常见拼写);否则输出修复清单,待业务确认后回填。
  6. 在报表层对未知状态设置兜底分组并显示修复指引。
  7. 持续对齐业务状态机与事件事件表(可另设状态跃迁约束检查)。

外键校验对象

  • 表与列:fact_orders.customer_id
  • 参照维表:dim_customer.customer_id
  • 关联时效:SCD2 生效区间 [valid_from, valid_to)
  • 可选状态约束:is_active = 1(通过参数启用)

一、校验规则(可执行约束)

  1. 值域与完整性
  • 非空:customer_id 不得为 NULL。
  • 正整数:customer_id > 0,禁止 0、负数。
  • 禁止占位符:不得为 -1、999999、unknown、N/A 等异常枚举(具体黑名单可配置)。
  • 失败记录标注为 reason_code=VALUE_DOMAIN_INVALID。
  1. 外键存在性
  • 必须能在 dim_customer 中找到相同 customer_id 的行。
  • 失败记录标注为 reason_code=FK_NOT_FOUND。
  1. SCD2 时效一致性
  • 对于 SCD2 维表:订单发生时间满足 valid_from <= order_date < valid_to(valid_to 为空视为无穷大)。
  • 失败记录标注为 reason_code=OUT_OF_SCD2_RANGE。
  1. 活跃状态一致性(可配置)
  • 当 active_check_enabled=true 时,要求 is_active = 1 在订单发生时刻对应的维表版本上成立。
  • 失败记录标注为 reason_code=INACTIVE_AT_ORDER_TIME。
  1. 阈值与发布策略
  • 失配率阈值 blocking_threshold=0.1%(可配置)。计算口径:失配记录数 / 当期校验样本总数。
  • 若失配率 > 阈值:阻断分区发布并触发维表回灌/事实修复;否则允许发布但在日报提示并安排修复批次。

二、检查步骤(分区级或时间窗口级执行)

  1. 确定校验范围
  • 建议按分区字段(如 dt 或 order_date 的自然日)执行;实时/微批增加延迟校验窗口 grace_window(默认 24 小时,可配置),以覆盖维表晚到。
  1. 值域与空值初筛
  • 统计并标记 customer_id 为空、非正整数、命中占位符黑名单的记录,为 VALUE_DOMAIN_INVALID。
  1. 左连接维表做参照检查
  • 使用 customer_id 作为连接键。
  • SCD2 情况:连接条件包含 order_date ∈ [valid_from, valid_to)。valid_to 为空等价于 +∞。
  • 若连接不到任意维表行:标记为 FK_NOT_FOUND。
  • 若连接到多条维表行(区间重叠):标记为 DIM_OVERLAP_CONFLICT(维表质量问题,建议单独告警)。
  1. 时效与状态核验
  • 对已连接到的维表版本校验区间覆盖;不满足则 OUT_OF_SCD2_RANGE。
  • 当 active_check_enabled=true 且 is_active ≠ 1:INACTIVE_AT_ORDER_TIME。
  1. 质量统计与画像
  • 产出总量、各类失配计数与占比。
  • 维度切片:按渠道 channel、地区 region 等核心维度统计失配分布,识别热点来源。
  • 将指标写入数据质量指标表/监控系统(含分区、执行批次、时间戳)。
  1. 处置与落地
  • 将异常记录写入 bad_records.fact_orders_customer_fk,字段建议包含:
    • fact 主键与关键信息:order_id、customer_id、order_date、channel、region、dt/ingest_batch_id
    • 维度追溯:reason_code、reason_detail、detected_at
    • 修复建议:suggested_fix(示例:补齐客户注册事件或维表SCD2版本;核对order_date时区)
  • 根据阈值策略决定是否阻断分区发布,并自动创建维表回灌/事实修复任务单。

三、统计口径与输出样例

  1. 指标定义
  • total_records:本次校验样本总数(剔除测试/删除标志记录,如 applicable)。
  • invalid_value_count:VALUE_DOMAIN_INVALID 数量。
  • orphan_count:FK_NOT_FOUND 数量。
  • scd2_mismatch_count:OUT_OF_SCD2_RANGE 数量。
  • inactive_count:INACTIVE_AT_ORDER_TIME 数量(仅当启用)。
  • overlap_conflict_count:DIM_OVERLAP_CONFLICT 数量(维表异常)。
  • mismatch_count:上述失配合计(不重复计数,优先级:VALUE_DOMAIN_INVALID > FK_NOT_FOUND > OUT_OF_SCD2_RANGE > INACTIVE_AT_ORDER_TIME)。
  • mismatch_ratio:mismatch_count / total_records。
  1. 维度分布样例(展示口径)
  • 失配按渠道分布(top 5):channel, mismatch_count, mismatch_ratio
  • 失配按地区分布(top 5):region, mismatch_count, mismatch_ratio
  • 趋势:近7天 mismatch_ratio 日级趋势
  1. 指标输出样例(示意)
  • total_records=1,000,000
  • mismatch_count=850(0.085%)
    • invalid_value_count=120
    • orphan_count=500
    • scd2_mismatch_count=200
    • inactive_count=30
    • overlap_conflict_count=0
  • 渠道分布:app 0.12%,web 0.06%,offline 0.04%

四、异常处理与运维策略

  1. 阻断与回灌
  • 若 mismatch_ratio > 0.1%:阻断 dt=2025-12-17 分区的发布;触发:
    • 维表回灌:补齐缺失客户、修复 SCD2 区间(消除重叠/断裂),重算 valid_to。
    • 事实修复:校正误填 customer_id、统一 order_date 时区。
  • 否则:分区可发布,必须推送日报与工单,安排 T+1/T+N 修复批次。
  1. 重试与延迟窗口
  • 实时/微批:对 FK_NOT_FOUND 记录允许在 grace_window 内重试匹配;仍失败才落 bad_records。
  • 离线批:允许一次性补维后重跑校验步骤。
  1. 告警分级
  • P1:mismatch_ratio > 0.5% 或 overlap_conflict_count > 0(维表结构性问题)
  • P2:0.1% < mismatch_ratio ≤ 0.5%
  • P3:mismatch_ratio ≤ 0.1% 但某渠道/地区异常偏高(阈值倍数触发)

五、配置项(建议以集中配置管理)

  • blocking_threshold(默认 0.1%)
  • active_check_enabled(默认 true 或按业务场景设定)
  • grace_window_hours(默认 24)
  • placeholder_blacklist:[-1, 0, 999999, 'unknown', 'N/A']
  • join_time_zone:订单时间与维表时区对齐策略(例如统一为 UTC)
  • distribution_dimensions:['channel', 'region']

六、实施与治理建议

  • 事实入库前置“客户字典预对齐”节点,确保关键客户在维表存在后再放行事实写入(降低孤儿率)。
  • 维表端约束:
    • customer_id 非空、唯一;SCD2 各版本区间不重叠且无缝衔接(前闭后开)。
    • 索引覆盖 customer_id、valid_from、valid_to,提高时效连接性能。
  • 事实层延迟校验窗口(grace_window)以缓解维表同步延迟。
  • 在数据字典/血缘中登记:
    • 外键关系:fact_orders.customer_id → dim_customer.customer_id
    • 时效字段:order_date、valid_from、valid_to 与边界规则
    • 回溯策略:维表回灌、事实重放范围与步骤
  • 时区与时间精度统一:明确 valid_from/valid_to 与 order_date 的时区与精度(到日/到秒),确保比较关系一致。
  • 审计与合规:保留校验结果、坏数据样本、修复流水与审批记录,满足外部稽核要求。

七、坏数据表结构建议(字段清单)

  • order_id, customer_id, order_date, dt/ingest_batch_id
  • channel, region
  • reason_code(VALUE_DOMAIN_INVALID | FK_NOT_FOUND | OUT_OF_SCD2_RANGE | INACTIVE_AT_ORDER_TIME | DIM_OVERLAP_CONFLICT)
  • reason_detail(具体原因说明,如“缺少维表SCD2版本”)
  • suggested_fix(如“补充客户注册事件/修复 SCD2 区间/统一时区后重放”)
  • detected_at(校验时间)

通过上述规则与步骤,可实现对 fact_orders.customer_id 的外键完整性、SCD2 时效一致性及可选活跃状态的一致校验,同时提供可观测的质量指标、明确的异常处置路径与落地治理保障。

示例详情

解决的问题

用一条高效提示词,自动生成可执行的数据验证规则,让每一列数据都有清晰边界与可追溯标准,帮助团队把“数据干净度”变成可复制的竞争力。

  • 快速:输入表名、列名与期望语言,数秒生成规则与检查步骤。
  • 专业:以“数据工程专家”视角给出权威、严谨的规范与说明。
  • 全面:覆盖格式、取值范围、唯一性、缺失值、依赖关系与常见异常处理。
  • 可落地:附带检查要点与实施建议,便于在表格与数据平台中执行。
  • 易传播:输出结构清晰,适合沉淀到数据字典与团队手册,降低沟通成本。
  • 降本增效:显著减少返工、缩短报表上线周期,提升指标可信度。
  • 多场景:适用于报表上线前校验、日常数据治理、质量巡检与合规审计准备。

适用用户

数据工程师

在导入、转换、落库环节快速生成列级校验规则,配套示例数据与说明,缩短上线周期并减少回滚。

数据平台/数仓负责人

搭建统一的字段校验模板,批量覆盖核心表,提升数据质量标准化,支持审计与走查。

数据分析师与BI开发者

为指标口径关键字段制定格式与范围限制,提前拦截脏数据,减少报表返工,加快周报月报交付。

特征总结

一键为指定表列生成可落地的数据验证规则,迅速建立数据质量防线,并可立即应用于导入与清洗环节。
智能识别常见字段类型,自动给出范围、格式、唯一性等校验方案,减少手工配置时间。
支持按需输出中文或英文说明与步骤,便于跨团队沟通与留存,降低实施与培训成本。
提供示例数据与通过/拒绝样例,快速自测规则效果,避免上线后频繁返工。
可根据业务场景生成严格版与宽松版规则,敏捷迭代,平衡数据质量与处理效率。
结构化呈现规则目的、适用范围、注意项,让新同事也能快速理解与接手维护。
参数化配置表名与列名,批量复用模板,轻松在多项目、多环境中统一标准。
结合处理链路上下文,建议在导入、转换、存储各环节设置校验点,减少问题外溢。
输出可直接落地的实施指引与变更清单,支持快速走查与审计,提升合规与透明度。
与报表与分析目标对齐,减少脏数据影响指标,提升决策可信度与交付节奏。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥25.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 286 tokens
- 7 个可调节参数
{ 目标数据表名称 } { 目标数据列名称 } { 数据验证类型 } { 校验规则详细说明 } { 输出示例代码 } { 示例代码语言 } { 数据样本示例 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59