¥
立即购买

数据列验证规则

307 浏览
27 试用
7 购买
Nov 24, 2025更新

为指定表格列生成专业数据验证规则,确保数据准确性和一致性,适用于数据清洗、转换和管道构建场景,提升数据质量和数据工程任务执行效率。

-- 目标:确保 dw_user_profile.email 字段可用、唯一且可追溯 -- 假设数据库为 PostgreSQL 12+(支持表达式索引与生成列)。其他引擎可参考实现思路。

-- 1) 规范化与清洗 UDF:统一 trim、lower;移除控制符与空白;处理常见全角变体与中文标点 CREATE SCHEMA IF NOT EXISTS dw;

CREATE OR REPLACE FUNCTION dw.email_norm(raw text) RETURNS text LANGUAGE plpgsql IMMUTABLE STRICT AS $$ DECLARE s text; BEGIN -- 基础规范化:去首尾空白 + 小写 s := lower(btrim(raw));

-- 将全角空格转换为半角空格,再移除所有空白(邮箱不允许空白) s := replace(s, ' ', ' '); s := regexp_replace(s, '\s+', '', 'g');

-- 移除控制字符(含不可见) s := regexp_replace(s, '[[:cntrl:]]', '', 'g');

-- 常见全角符号->半角;(@.-+_%) s := translate(s, '@.-+_%', '@.-+_%');

-- 清理常见中文标点(如果误写入) s := regexp_replace(s, '[,、。;:]', '', 'g');

-- 归一化后空串视为 NULL RETURN NULLIF(s, ''); END; $$;

-- 2) 生成列:持久化规范化结果,便于索引与审计 ALTER TABLE dw_user_profile ADD COLUMN IF NOT EXISTS email_norm TEXT GENERATED ALWAYS AS (dw.email_norm(email)) STORED;

-- 3) 数据类型匹配:存储为 VARCHAR/STRING,统一 UTF-8 编码(PostgreSQL 默认 UTF-8) -- 长度上限可按常见规范设置为 254(可选) ALTER TABLE dw_user_profile ALTER COLUMN email TYPE VARCHAR(254);

-- 4) 非空校验:禁止 NULL、空字符串、仅空白及“null”“n/a”等占位文本 ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_nonempty CHECK ( email_norm IS NOT NULL AND email_norm NOT IN ('null', 'n/a') );

-- 5) 格式/正则校验: -- 要求: -- - 必须包含且仅包含一个 @ -- - local:允许 [A-Za-z0-9.%+-];不以点开头/结尾;不出现连续两个点 -- - domain:允许子域;每段为字母数字与连字符(连字符不可置于段首尾) -- - 顶级域为 2–15 个字母 -- 注:如需更严谨可加入 IDN 转码与 MX 记录扩展校验(在 ETL 任务中实现) ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_format CHECK ( email_norm ~ '^[A-Za-z0-9%+-]+(?:.[A-Za-z0-9_%+-]+)*@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$' );

-- 6) 唯一性:整表唯一,忽略大小写与首尾空白(借助规范化生成列) CREATE UNIQUE INDEX IF NOT EXISTS ux_dw_user_profile_email_norm ON dw_user_profile (email_norm);

-- 7) 依赖一致性示例(如存在 email_verified 字段): -- 未验证邮箱仍可存在,但格式不可错;该约束已由格式/非空校验保障。 -- 若需附加规则:禁止将无效邮箱设置为已验证 ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_verified_consistency CHECK ( email_verified IS NULL OR email_verified IN (TRUE, FALSE) );

-- 8) 质量视图:输出问题样本(空值/占位/格式错误) CREATE OR REPLACE VIEW dw.dq_vw_invalid_email AS SELECT id, email, email_norm, CASE WHEN email_norm IS NULL THEN '空值/仅空白' WHEN email_norm IN ('null','n/a') THEN '占位文本' WHEN email_norm !~ '^[A-Za-z0-9_%+-]+(?:.[A-Za-z0-9_%+-]+)@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$' THEN '格式不符' ELSE '未知' END AS issue FROM dw_user_profile WHERE email_norm IS NULL OR email_norm IN ('null','n/a') OR email_norm !~ '^[A-Za-z0-9_%+-]+(?:.[A-Za-z0-9_%+-]+)@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$';

-- 9) 重复检测视图:识别按规范化后重复的邮箱 -- 保留最新且活跃记录,用于后续修复作业 CREATE OR REPLACE VIEW dw.dq_vw_duplicate_email AS SELECT email_norm, COUNT() AS dup_count, ARRAY_AGG(id ORDER BY COALESCE(is_active, FALSE) DESC, COALESCE(updated_at, 'epoch'::timestamp) DESC) AS ordered_ids FROM dw_user_profile WHERE email_norm IS NOT NULL GROUP BY email_norm HAVING COUNT() > 1;

-- 10) 示例修复作业:保留最新活跃记录,停用历史重复;记录审计日志 -- 说明:需存在审计表 dw.audit_log(entity, entity_id, action, details, occurred_at) WITH dup AS ( SELECT email_norm, ARRAY_AGG(id ORDER BY COALESCE(is_active, FALSE) DESC, COALESCE(updated_at, 'epoch'::timestamp) DESC) AS ids FROM dw_user_profile WHERE email_norm IS NOT NULL GROUP BY email_norm HAVING COUNT(*) > 1 ), to_archive AS ( SELECT email_norm, unnest(ids[2:]) AS id, ids[1] AS kept_id FROM dup ) UPDATE dw_user_profile u SET is_active = FALSE, status = 'archived_duplicate', updated_at = NOW() FROM to_archive t WHERE u.id = t.id;

INSERT INTO dw.audit_log(entity, entity_id, action, details, occurred_at) SELECT 'dw_user_profile', t.id, 'duplicate_email_archived', jsonb_build_object('email_norm', t.email_norm, 'kept_id', t.kept_id, 'archived_id', t.id), NOW() FROM to_archive t;

-- 11) 写入前清洗触发器(可选):统一 trim + lower 原始 email 字段,增强一致性 CREATE OR REPLACE FUNCTION dw.enforce_email_canonical() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN IF NEW.email IS NOT NULL THEN NEW.email := lower(btrim(NEW.email)); END IF; RETURN NEW; END; $$;

DROP TRIGGER IF EXISTS trg_dw_user_profile_email_canonical ON dw_user_profile; CREATE TRIGGER trg_dw_user_profile_email_canonical BEFORE INSERT OR UPDATE ON dw_user_profile FOR EACH ROW EXECUTE FUNCTION dw.enforce_email_canonical();

PySpark data validation for fact_order_item.total_amount

Rules implemented:

- 非空: forbid NULL and empty string; canceled orders must use 0 not NULL.

- 数据类型匹配: normalize to DECIMAL(18,2), two decimals via HALF_UP rounding; forbid scientific notation and currency symbols; clean thousands separator and non-digit chars; numeric-ize before write.

- 范围校验: 0 <= total_amount <= 1_000_000; negatives not allowed; out-of-range marked for business review.

- 依赖关系校验: total_amount ≈ quantityunit_price within 0.01; if discount_amount/tax_amount present, total_amount = quantityunit_price - discount_amount + tax_amount. Sample-based verification supported for batch runs.

- 一致性: same natural key across partitions/batches must be consistent in amount.

- 异常处理: output detailed error records for non-numeric/overflow/consistency issues.

from typing import Dict, List, Optional

from pyspark.sql import SparkSession, DataFrame, functions as F, types as T

spark = SparkSession.builder.getOrCreate()

AMOUNT_UPPER_LIMIT = 1_000_000.0 AMOUNT_TOLERANCE = 0.01 DECIMAL_TYPE = T.DecimalType(18, 2)

def has_cols(df: DataFrame, cols: List[str]) -> bool: return all(c in df.columns for c in cols)

def first_existing(df: DataFrame, candidates: List[str]) -> Optional[str]: for c in candidates: if c in df.columns: return c return None

def normalize_total_amount(df_raw: DataFrame, total_col: str = "total_amount") -> DataFrame: """ 1) Strip thousand separators and currency symbols. 2) Forbid scientific notation; mark as invalid if contains e/E. 3) Enforce numeric pattern ^-?\d+(\.\d+)?$ 4) Cast to DOUBLE, round HALF_UP to 2 decimals, cast to DECIMAL(18,2). Returns df with: - total_amount_clean_str - total_amount_num_dec (DECIMAL(18,2)) normalized - flags: invalid_number, sci_notation_found """ # capture raw string for diagnostics df = df_raw.withColumn("total_amount_raw_str", F.col(total_col).cast("string"))

# detect scientific notation beforehand
df = df.withColumn(
    "sci_notation_found",
    F.when(F.col("total_amount_raw_str").rlike("(?i)[+-]?[0-9]*\\.?[0-9]+[eE][+-]?[0-9]+"), F.lit(True)).otherwise(F.lit(False)),
)

# remove common currency symbols and thousand separators, spaces
# keep digits, dot, minus
df = df.withColumn(
    "total_amount_clean_str",
    F.regexp_replace(F.col("total_amount_raw_str"), r"[,\s¥¥$€£]", "")
).withColumn(
    "total_amount_clean_str",
    F.regexp_replace(F.col("total_amount_clean_str"), r"[^0-9\.\-]", "")
)

# validate simple decimal pattern (no scientific notation)
numeric_pattern = r"^-?\d+(\.\d+)?$"
df = df.withColumn(
    "numeric_like",
    F.when(F.col("total_amount_clean_str").rlike(numeric_pattern), F.lit(True)).otherwise(F.lit(False))
)

# mark invalid numbers (include sci notation or bad pattern)
df = df.withColumn(
    "invalid_number",
    (~F.col("numeric_like")) | F.col("sci_notation_found")
)

# cast to double and round HALF_UP using round(), then cast to DECIMAL(18,2)
# invalid_number will yield null after cast; keep for error reporting
df = df.withColumn(
    "total_amount_num_dbl",
    F.when(~F.col("invalid_number"), F.col("total_amount_clean_str").cast("double")).otherwise(F.lit(None).cast("double"))
)

df = df.withColumn(
    "total_amount_num_dec",
    F.when(F.col("total_amount_num_dbl").isNotNull(), F.round(F.col("total_amount_num_dbl"), 2).cast(DECIMAL_TYPE)).otherwise(F.lit(None).cast(DECIMAL_TYPE))
)

return df

def validate_total_amount(df_raw: DataFrame, total_col: str = "total_amount", sample_fraction: float = 0.1, seed: int = 42) -> Dict[str, DataFrame]: """ Apply validation rules on fact_order_item.total_amount. Returns dict with: - valid_df: records passing all checks, total_amount normalized to DECIMAL(18,2) in column total_amount_std - errors_df: row-level error details (multiple rows per input possible if multiple failures) - consistency_conflicts_df: group-level inconsistencies across partitions/batches - diff_report_df: summary stats for recomputation differences - dist_stats_df: distribution stats for spike detection """ df = normalize_total_amount(df_raw, total_col) # Non-empty check df = df.withColumn("is_empty_str", F.trim(F.col("total_amount_raw_str")) == "")

# Non-null check
df = df.withColumn("is_null", F.col(total_col).isNull())

# Type check failures
type_fail = (F.col("invalid_number") | F.col("total_amount_num_dec").isNull())

# Range check: 0 <= amount <= 1_000_000
df = df.withColumn("range_underflow", F.col("total_amount_num_dec") < F.lit(0).cast(DECIMAL_TYPE))
df = df.withColumn("range_overflow", F.col("total_amount_num_dec") > F.lit(AMOUNT_UPPER_LIMIT).cast(DECIMAL_TYPE))

# Recompute expected amount
q = F.col("quantity").cast("double") if "quantity" in df.columns else F.lit(None).cast("double")
p = F.col("unit_price").cast("double") if "unit_price" in df.columns else F.lit(None).cast("double")

has_disc = "discount_amount" in df.columns
has_tax = "tax_amount" in df.columns

d = F.coalesce(F.col("discount_amount").cast("double"), F.lit(0.0)) if has_disc else F.lit(0.0)
t = F.coalesce(F.col("tax_amount").cast("double"), F.lit(0.0)) if has_tax else F.lit(0.0)

base = q * p
expected_expr = F.when(
    ((F.col("discount_amount").isNull()) & (F.col("tax_amount").isNull())) if (has_disc and has_tax) else F.lit(False),
    base
).otherwise(base - d + t)

# Only compute when q and p are present
expected_expr = F.when(q.isNotNull() & p.isNotNull(), expected_expr).otherwise(F.lit(None).cast("double"))

df = df.withColumn("expected_total_dbl", expected_expr)
df = df.withColumn("expected_total_dec", F.when(F.col("expected_total_dbl").isNotNull(),
                                               F.round(F.col("expected_total_dbl"), 2).cast(DECIMAL_TYPE)).otherwise(F.lit(None).cast(DECIMAL_TYPE)))
# Absolute difference in double for tolerance comparison
df = df.withColumn("recalc_abs_diff",
                   F.when(F.col("expected_total_dec").isNotNull() & F.col("total_amount_num_dec").isNotNull(),
                          F.abs(F.col("total_amount_num_dec").cast("double") - F.col("expected_total_dec").cast("double")))
                    .otherwise(F.lit(None).cast("double")))
df = df.withColumn("recalc_mismatch",
                   F.when(F.col("recalc_abs_diff").isNotNull(), F.col("recalc_abs_diff") > F.lit(AMOUNT_TOLERANCE))
                    .otherwise(F.lit(False)))

# Sample-based verification (reduces cost in batch). Records not in sample keep `recalc_mismatch` as computed (can be skipped if None).
df_sample = df.sample(withReplacement=False, fraction=sample_fraction, seed=seed)
df = (df.alias("all")
        .join(df_sample.select("order_id").alias("s"), on=["order_id"], how="left")  # sample join key by order for better coverage
        .withColumn("in_sample", F.col("s.order_id").isNotNull())
        .drop("s.order_id"))

# For rows outside sample, we can optionally null out mismatch to not flag them in batch; here we keep full computation
# If you prefer sampling strictly, uncomment the next line:
# df = df.withColumn("recalc_mismatch", F.when(F.col("in_sample"), F.col("recalc_mismatch")).otherwise(F.lit(False)))

# Consistency check across batches/partitions for same natural key
# Define natural key columns
key_cols = []
for candidates in [["order_id", "order_item_id"], ["order_id", "item_id"], ["order_id", "sku_id"]]:
    if has_cols(df, candidates):
        key_cols = candidates
        break
if not key_cols:
    key_cols = ["order_id"]  # fallback if item-level key not present

# Use available batch/partition identifiers for diagnostics
part_cols = [c for c in ["batch_id", "ingest_date", "etl_date", "dt", "partition_date"] if c in df.columns]

# Build error rows
error_rows = []

# Non-null and empty checks
nonnull_fail_df = df.where(F.col("is_null") | F.col("is_empty_str")) \
    .select(*(key_cols + part_cols),
            F.col("total_amount_raw_str").alias("value"),
            F.lit("NON_NULL").alias("error_type"),
            F.when(F.col("is_null"), F.lit("total_amount is NULL"))
             .when(F.col("is_empty_str"), F.lit("total_amount is empty string"))
             .otherwise(F.lit("unknown")).alias("error_reason"))
error_rows.append(nonnull_fail_df)

# Type failures (invalid_number or cast failed)
type_fail_df = df.where(type_fail) \
    .select(*(key_cols + part_cols),
            F.col("total_amount_raw_str").alias("value"),
            F.lit("TYPE_MISMATCH").alias("error_type"),
            F.when(F.col("sci_notation_found"), F.lit("scientific notation forbidden"))
             .when(F.col("invalid_number"), F.lit("non-numeric after cleaning"))
             .otherwise(F.lit("cast to DECIMAL(18,2) failed")).alias("error_reason"))
error_rows.append(type_fail_df)

# Range failures
range_fail_df = df.where(F.col("range_underflow") | F.col("range_overflow")) \
    .select(*(key_cols + part_cols),
            F.col("total_amount_raw_str").alias("value"),
            F.lit("RANGE").alias("error_type"),
            F.when(F.col("range_underflow"), F.lit("amount < 0 not allowed"))
             .when(F.col("range_overflow"), F.lit("amount > 1,000,000 requires review"))
             .otherwise(F.lit("range violation")).alias("error_reason"))
error_rows.append(range_fail_df)

# Recalculation mismatch (only when we have inputs)
recalc_fail_df = df.where(F.col("recalc_mismatch") & F.col("expected_total_dec").isNotNull()) \
    .select(*(key_cols + part_cols),
            F.col("total_amount_num_dec").cast("string").alias("value"),
            F.lit("RECALC_MISMATCH").alias("error_type"),
            F.format_string("expected=%s, actual=%s, diff=%.4f",
                            F.col("expected_total_dec").cast("string"),
                            F.col("total_amount_num_dec").cast("string"),
                            F.col("recalc_abs_diff")).alias("error_reason"))
error_rows.append(recalc_fail_df)

errors_df = None
if error_rows:
    errors_df = error_rows[0]
    for e in error_rows[1:]:
        errors_df = errors_df.unionByName(e, allowMissingColumns=True)

# Consistency conflicts: same key has multiple distinct amounts across partitions/batches
# We check distinct DECIMAL(18,2) representations
cons_group = (df.where(F.col("total_amount_num_dec").isNotNull())
                .groupBy(*key_cols)
                .agg(F.countDistinct("total_amount_num_dec").alias("distinct_amounts"),
                     F.collect_set("total_amount_num_dec").alias("amount_set"),
                     *([F.collect_set(c).alias(f"{c}_set") for c in part_cols] if part_cols else []))
                .where(F.col("distinct_amounts") > 1))
consistency_conflicts_df = cons_group

# Distribution stats for spike detection
# Use approx_percentile; if not supported, fallback to approxQuantile via action
quantiles = [0.5, 0.9, 0.95, 0.99]
dist_stats_df = (df.where(F.col("total_amount_num_dec").isNotNull())
                   .agg(F.count("*").alias("n"),
                        F.min("total_amount_num_dec").alias("min"),
                        F.expr("percentile_approx(total_amount_num_dec, 0.5)").alias("p50"),
                        F.expr("percentile_approx(total_amount_num_dec, 0.9)").alias("p90"),
                        F.expr("percentile_approx(total_amount_num_dec, 0.95)").alias("p95"),
                        F.expr("percentile_approx(total_amount_num_dec, 0.99)").alias("p99"),
                        F.max("total_amount_num_dec").alias("max")))

# Diff report (summary) for recomputation mismatches within sample scope
diff_report_df = (df.where(F.col("expected_total_dec").isNotNull())
                    .agg(
                        F.sum(F.when(F.col("recalc_mismatch"), 1).otherwise(0)).alias("mismatch_cnt"),
                        F.count("*").alias("checked_cnt"),
                        F.avg(F.when(F.col("recalc_abs_diff").isNotNull(), F.col("recalc_abs_diff"))).alias("avg_abs_diff"),
                        F.max("recalc_abs_diff").alias("max_abs_diff")
                    ))

# Valid records: pass all checks
valid_df = (df.where(
                (~F.col("is_null")) &
                (~F.col("is_empty_str")) &
                (~type_fail) &
                (~F.col("range_underflow")) &
                (~F.col("range_overflow")) &
                (~F.col("recalc_mismatch") | F.col("expected_total_dec").isNull())  # allow pass when expected not computable due to missing inputs
            )
            .withColumn("total_amount_std", F.col("total_amount_num_dec").cast(DECIMAL_TYPE))
            # Optional: drop helper columns
            .drop("total_amount_num_dbl", "total_amount_num_dec", "total_amount_clean_str",
                  "invalid_number", "numeric_like", "sci_notation_found",
                  "is_null", "is_empty_str", "range_underflow", "range_overflow",
                  "expected_total_dbl", "expected_total_dec", "recalc_abs_diff",
                  "recalc_mismatch", "in_sample", "total_amount_raw_str"))

return {
    "valid_df": valid_df,
    "errors_df": errors_df,
    "consistency_conflicts_df": consistency_conflicts_df,
    "diff_report_df": diff_report_df,
    "dist_stats_df": dist_stats_df
}

-------------------------------

Example usage (adjust I/O as needed)

-------------------------------

df_raw = spark.read.format("delta").load("/data/warehouse/fact_order_item")

result = validate_total_amount(df_raw, total_col="total_amount", sample_fraction=0.2, seed=7)

result["valid_df"].write.mode("overwrite").format("delta").save("/data/validated/fact_order_item")

result["errors_df"].write.mode("overwrite").format("delta").save("/data/quality/fact_order_item_total_amount_errors")

result["consistency_conflicts_df"].write.mode("overwrite").format("delta").save("/data/quality/fact_order_item_total_amount_consistency")

result["diff_report_df"].show(truncate=False)

result["dist_stats_df"].show(truncate=False)

// 验证对象:stg_event_log.event_date // 目标:统一事件日期口径(ISO 8601 yyyy-MM-dd),保证一致性与可追踪性。 // 数据验证类型覆盖:非空、格式校验、范围校验、数据类型匹配。 // 说明:下面每条规则以 Spark SQL 布尔谓词表示(为真视为通过)。带有可选依赖列(如 event_time、ingest_ts、dt、event_date_str)的规则仅在该列存在时执行。

object StgEventLogEventDateValidation {

// 最低基准日与允许的跨时区未来漂移 val MinDate = "2018-01-01" val AllowedFutureDriftDays = 1

// 规则结构定义(配置化表达) final case class ValidationRule( id: String, category: String, // 非空/格式校验/范围校验/数据类型匹配/依赖一致性 predicate: String, // Spark SQL 谓词(必须为 TRUE) failureMsg: String, // 失败说明 dependsOn: Seq[String] = Nil, // 必备列 optionalDependsOn: Seq[String] = Nil // 可选列(存在才校验) )

// 架构/元数据级校验(在作业启动时一次性检查) // 要求:stg_event_log.event_date 为 DATE 类型 // 伪表达(需在作业中以 schema 检查实现): // require(df.schema("event_date").dataType == org.apache.spark.sql.types.DateType, "event_date 必须为 DATE 类型") val schemaRules: Seq[ValidationRule] = Seq( ValidationRule( id = "TYPE_MATCH_001", category = "数据类型匹配", predicate = "/* metadata check: event_date MUST be DATE type */ TRUE", failureMsg = "event_date 非 DATE 类型(必须为 DATE)" ) )

// 行级校验规则(按记录评估) val rowRules: Seq[ValidationRule] = Seq(

// 非空
ValidationRule(
  id = "NOT_NULL_001",
  category = "非空",
  predicate =
    """event_date IS NOT NULL""",
  failureMsg = "event_date 为空(禁止 NULL)",
  dependsOn = Seq("event_date")
),
ValidationRule(
  id = "NOT_PLACEHOLDER_002",
  category = "非空",
  predicate =
    """date_format(event_date, 'yyyy-MM-dd') NOT IN ('0000-00-00', '9999-12-31')""",
  failureMsg = "event_date 为占位日期(0000-00-00 或 9999-12-31 禁止)",
  dependsOn = Seq("event_date")
),

// 格式校验(严格 yyyy-MM-dd,不带时间/时区;若存在源字符串列 event_date_str 则按字符串严格校验)
ValidationRule(
  id = "FORMAT_STRICT_001",
  category = "格式校验",
  predicate =
    // 当存在 event_date_str(源字符串):
    // 1) 必须匹配 yyyy-MM-dd
    // 2) 禁止包含时间、时区、斜杠、点号等
    // 3) 解析值与落地 event_date 一致
    // 若不存在 event_date_str,仅验证落地日期可被标准化回 yyyy-MM-dd(恒真,但保留占位)
    """CASE
       WHEN event_date_str IS NOT NULL THEN
         event_date_str RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'
         AND NOT (event_date_str RLIKE '[/\\.]' OR event_date_str RLIKE '[Tt]' OR event_date_str RLIKE '(Z|\\+\\d{2}:?\\d{2})')
         AND to_date(event_date_str, 'yyyy-MM-dd') IS NOT NULL
         AND to_date(event_date_str, 'yyyy-MM-dd') = event_date
       ELSE TRUE
     END""",
  failureMsg = "事件日期格式不合规(必须严格 yyyy-MM-dd;不得含时间/时区/斜杠/点号;解析后须与 event_date 一致)",
  dependsOn = Seq("event_date"),
  optionalDependsOn = Seq("event_date_str")
),

// 非法日期检测(如 2024-02-30),在 ETL 解析阶段应置为 NULL 并入问题队列
ValidationRule(
  id = "ILLEGAL_DATE_002",
  category = "格式校验",
  predicate =
    """CASE
       WHEN event_date_str IS NOT NULL THEN to_date(event_date_str, 'yyyy-MM-dd') IS NOT NULL
       ELSE event_date IS NOT NULL
     END""",
  failureMsg = "无法解析为合法公历日期(如 2024-02-30 等)",
  dependsOn = Seq("event_date"),
  optionalDependsOn = Seq("event_date_str")
),

// 范围校验:2018-01-01 至当前日期(含);允许 +1 天漂移
ValidationRule(
  id = "RANGE_001",
  category = "范围校验",
  predicate =
    """event_date >= to_date('2018-01-01') AND event_date <= date_add(current_date(), 1)""",
  failureMsg = "事件日期不在允许范围(2018-01-01 至当前日期,未来最多允许 +1 天)",
  dependsOn = Seq("event_date")
),

// 依赖一致性:event_time 存在时,to_date(event_time) 必须等于 event_date
ValidationRule(
  id = "CONSISTENCY_TIME_001",
  category = "依赖一致性",
  predicate =
    """CASE WHEN event_time IS NOT NULL THEN to_date(event_time) = event_date ELSE TRUE END""",
  failureMsg = "event_time 与 event_date 不一致(to_date(event_time) 应等于 event_date)",
  dependsOn = Seq("event_date"),
  optionalDependsOn = Seq("event_time")
),

// 依赖一致性:ingest_ts 不应早于 event_date
ValidationRule(
  id = "CONSISTENCY_INGEST_002",
  category = "依赖一致性",
  predicate =
    """CASE WHEN ingest_ts IS NOT NULL THEN to_date(ingest_ts) >= event_date ELSE TRUE END""",
  failureMsg = "ingest_ts 早于 event_date(不允许回溯入湖)",
  dependsOn = Seq("event_date"),
  optionalDependsOn = Seq("ingest_ts")
),

// 分区一致性:dt(分区字段,字符串)需与 event_date 一致(yyyy-MM-dd)
ValidationRule(
  id = "PARTITION_MATCH_003",
  category = "依赖一致性",
  predicate =
    """CASE WHEN dt IS NOT NULL THEN dt = date_format(event_date, 'yyyy-MM-dd') ELSE TRUE END""",
  failureMsg = "分区字段 dt 与 event_date 不一致(dt 应等于 date_format(event_date,'yyyy-MM-dd'))",
  dependsOn = Seq("event_date"),
  optionalDependsOn = Seq("dt")
),

// 本地化/异常输入检测(进入异常队列):中文日期与本地化月名、零前导等
ValidationRule(
  id = "LOCALE_VARIANT_004",
  category = "格式校验",
  predicate =
    """CASE
       WHEN event_date_str IS NOT NULL THEN
         NOT (event_date_str RLIKE '[年月日]' OR
              event_date_str RLIKE '(?i)Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec' OR
              event_date_str RLIKE '[/\\.]' OR
              event_date_str RLIKE '^\d{5,}-')
       ELSE TRUE
     END""",
  failureMsg = "检测到本地化/变体日期格式(中文日期、英文月名、斜杠/点号或异常零前导年份)",
  optionalDependsOn = Seq("event_date_str")
)

) }

// 异常处理与实施建议(流程性说明): // 1) 入湖层解析:对源字符串(若存在,如 event_date_str)应用严格日期解析 UDF: // - 仅接受 yyyy-MM-dd;清理并拒绝包含时间/时区/斜杠/点号/中文/英文月名的变体; // - 非法日期(如 2024-02-30)解析为 NULL;保留原始值以便审计; // - 对存在零前导、非法月份/日的样本直接入问题队列,附修复建议。 // 2) 一致性检查作业:在写入分区前执行 rowRules 全量校验;对失败记录输出到异常样本表(含 rule_id、failureMsg、原值、修复建议)。 // 3) 分区一致性:写前强制 dt = date_format(event_date,'yyyy-MM-dd');若冲突则拦截写入。 // 4) 审计追踪:保留 event_date 原始字符串(event_date_str)与解析结果(event_date),链路中记录解析/清洗 UDF 版本与运行批次。 // 5) 报表上线前:要求异常队列清空或签署豁免(明确数量、原因与治理计划)。

示例详情

解决的问题

用一条高效提示词,自动生成可执行的数据验证规则,让每一列数据都有清晰边界与可追溯标准,帮助团队把“数据干净度”变成可复制的竞争力。

  • 快速:输入表名、列名与期望语言,数秒生成规则与检查步骤。
  • 专业:以“数据工程专家”视角给出权威、严谨的规范与说明。
  • 全面:覆盖格式、取值范围、唯一性、缺失值、依赖关系与常见异常处理。
  • 可落地:附带检查要点与实施建议,便于在表格与数据平台中执行。
  • 易传播:输出结构清晰,适合沉淀到数据字典与团队手册,降低沟通成本。
  • 降本增效:显著减少返工、缩短报表上线周期,提升指标可信度。
  • 多场景:适用于报表上线前校验、日常数据治理、质量巡检与合规审计准备。

适用用户

数据工程师

在导入、转换、落库环节快速生成列级校验规则,配套示例数据与说明,缩短上线周期并减少回滚。

数据平台/数仓负责人

搭建统一的字段校验模板,批量覆盖核心表,提升数据质量标准化,支持审计与走查。

数据分析师与BI开发者

为指标口径关键字段制定格式与范围限制,提前拦截脏数据,减少报表返工,加快周报月报交付。

特征总结

一键为指定表列生成可落地的数据验证规则,迅速建立数据质量防线,并可立即应用于导入与清洗环节。
智能识别常见字段类型,自动给出范围、格式、唯一性等校验方案,减少手工配置时间。
支持按需输出中文或英文说明与步骤,便于跨团队沟通与留存,降低实施与培训成本。
提供示例数据与通过/拒绝样例,快速自测规则效果,避免上线后频繁返工。
可根据业务场景生成严格版与宽松版规则,敏捷迭代,平衡数据质量与处理效率。
结构化呈现规则目的、适用范围、注意项,让新同事也能快速理解与接手维护。
参数化配置表名与列名,批量复用模板,轻松在多项目、多环境中统一标准。
结合处理链路上下文,建议在导入、转换、存储各环节设置校验点,减少问题外溢。
输出可直接落地的实施指引与变更清单,支持快速走查与审计,提升合规与透明度。
与报表与分析目标对齐,减少脏数据影响指标,提升决策可信度与交付节奏。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥25.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 286 tokens
- 6 个可调节参数
{ 表格名称 } { 列名称 } { 数据验证类型 } { 输出语言 } { 校验规则说明 } { 是否生成示例代码 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59