热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
为指定表格列生成专业数据验证规则,确保数据准确性和一致性,适用于数据清洗、转换和管道构建场景,提升数据质量和数据工程任务执行效率。
-- 目标:确保 dw_user_profile.email 字段可用、唯一且可追溯 -- 假设数据库为 PostgreSQL 12+(支持表达式索引与生成列)。其他引擎可参考实现思路。
-- 1) 规范化与清洗 UDF:统一 trim、lower;移除控制符与空白;处理常见全角变体与中文标点 CREATE SCHEMA IF NOT EXISTS dw;
CREATE OR REPLACE FUNCTION dw.email_norm(raw text) RETURNS text LANGUAGE plpgsql IMMUTABLE STRICT AS $$ DECLARE s text; BEGIN -- 基础规范化:去首尾空白 + 小写 s := lower(btrim(raw));
-- 将全角空格转换为半角空格,再移除所有空白(邮箱不允许空白) s := replace(s, ' ', ' '); s := regexp_replace(s, '\s+', '', 'g');
-- 移除控制字符(含不可见) s := regexp_replace(s, '[[:cntrl:]]', '', 'g');
-- 常见全角符号->半角;(@.-+_%) s := translate(s, '@.-+_%', '@.-+_%');
-- 清理常见中文标点(如果误写入) s := regexp_replace(s, '[,、。;:]', '', 'g');
-- 归一化后空串视为 NULL RETURN NULLIF(s, ''); END; $$;
-- 2) 生成列:持久化规范化结果,便于索引与审计 ALTER TABLE dw_user_profile ADD COLUMN IF NOT EXISTS email_norm TEXT GENERATED ALWAYS AS (dw.email_norm(email)) STORED;
-- 3) 数据类型匹配:存储为 VARCHAR/STRING,统一 UTF-8 编码(PostgreSQL 默认 UTF-8) -- 长度上限可按常见规范设置为 254(可选) ALTER TABLE dw_user_profile ALTER COLUMN email TYPE VARCHAR(254);
-- 4) 非空校验:禁止 NULL、空字符串、仅空白及“null”“n/a”等占位文本 ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_nonempty CHECK ( email_norm IS NOT NULL AND email_norm NOT IN ('null', 'n/a') );
-- 5) 格式/正则校验: -- 要求: -- - 必须包含且仅包含一个 @ -- - local:允许 [A-Za-z0-9.%+-];不以点开头/结尾;不出现连续两个点 -- - domain:允许子域;每段为字母数字与连字符(连字符不可置于段首尾) -- - 顶级域为 2–15 个字母 -- 注:如需更严谨可加入 IDN 转码与 MX 记录扩展校验(在 ETL 任务中实现) ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_format CHECK ( email_norm ~ '^[A-Za-z0-9%+-]+(?:.[A-Za-z0-9_%+-]+)*@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$' );
-- 6) 唯一性:整表唯一,忽略大小写与首尾空白(借助规范化生成列) CREATE UNIQUE INDEX IF NOT EXISTS ux_dw_user_profile_email_norm ON dw_user_profile (email_norm);
-- 7) 依赖一致性示例(如存在 email_verified 字段): -- 未验证邮箱仍可存在,但格式不可错;该约束已由格式/非空校验保障。 -- 若需附加规则:禁止将无效邮箱设置为已验证 ALTER TABLE dw_user_profile ADD CONSTRAINT IF NOT EXISTS chk_email_verified_consistency CHECK ( email_verified IS NULL OR email_verified IN (TRUE, FALSE) );
-- 8) 质量视图:输出问题样本(空值/占位/格式错误) CREATE OR REPLACE VIEW dw.dq_vw_invalid_email AS SELECT id, email, email_norm, CASE WHEN email_norm IS NULL THEN '空值/仅空白' WHEN email_norm IN ('null','n/a') THEN '占位文本' WHEN email_norm !~ '^[A-Za-z0-9_%+-]+(?:.[A-Za-z0-9_%+-]+)@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$' THEN '格式不符' ELSE '未知' END AS issue FROM dw_user_profile WHERE email_norm IS NULL OR email_norm IN ('null','n/a') OR email_norm !~ '^[A-Za-z0-9_%+-]+(?:.[A-Za-z0-9_%+-]+)@(?:A-Za-z0-9?.)+[A-Za-z]{2,15}$';
-- 9) 重复检测视图:识别按规范化后重复的邮箱 -- 保留最新且活跃记录,用于后续修复作业 CREATE OR REPLACE VIEW dw.dq_vw_duplicate_email AS SELECT email_norm, COUNT() AS dup_count, ARRAY_AGG(id ORDER BY COALESCE(is_active, FALSE) DESC, COALESCE(updated_at, 'epoch'::timestamp) DESC) AS ordered_ids FROM dw_user_profile WHERE email_norm IS NOT NULL GROUP BY email_norm HAVING COUNT() > 1;
-- 10) 示例修复作业:保留最新活跃记录,停用历史重复;记录审计日志 -- 说明:需存在审计表 dw.audit_log(entity, entity_id, action, details, occurred_at) WITH dup AS ( SELECT email_norm, ARRAY_AGG(id ORDER BY COALESCE(is_active, FALSE) DESC, COALESCE(updated_at, 'epoch'::timestamp) DESC) AS ids FROM dw_user_profile WHERE email_norm IS NOT NULL GROUP BY email_norm HAVING COUNT(*) > 1 ), to_archive AS ( SELECT email_norm, unnest(ids[2:]) AS id, ids[1] AS kept_id FROM dup ) UPDATE dw_user_profile u SET is_active = FALSE, status = 'archived_duplicate', updated_at = NOW() FROM to_archive t WHERE u.id = t.id;
INSERT INTO dw.audit_log(entity, entity_id, action, details, occurred_at) SELECT 'dw_user_profile', t.id, 'duplicate_email_archived', jsonb_build_object('email_norm', t.email_norm, 'kept_id', t.kept_id, 'archived_id', t.id), NOW() FROM to_archive t;
-- 11) 写入前清洗触发器(可选):统一 trim + lower 原始 email 字段,增强一致性 CREATE OR REPLACE FUNCTION dw.enforce_email_canonical() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN IF NEW.email IS NOT NULL THEN NEW.email := lower(btrim(NEW.email)); END IF; RETURN NEW; END; $$;
DROP TRIGGER IF EXISTS trg_dw_user_profile_email_canonical ON dw_user_profile; CREATE TRIGGER trg_dw_user_profile_email_canonical BEFORE INSERT OR UPDATE ON dw_user_profile FOR EACH ROW EXECUTE FUNCTION dw.enforce_email_canonical();
from typing import Dict, List, Optional
from pyspark.sql import SparkSession, DataFrame, functions as F, types as T
spark = SparkSession.builder.getOrCreate()
AMOUNT_UPPER_LIMIT = 1_000_000.0 AMOUNT_TOLERANCE = 0.01 DECIMAL_TYPE = T.DecimalType(18, 2)
def has_cols(df: DataFrame, cols: List[str]) -> bool: return all(c in df.columns for c in cols)
def first_existing(df: DataFrame, candidates: List[str]) -> Optional[str]: for c in candidates: if c in df.columns: return c return None
def normalize_total_amount(df_raw: DataFrame, total_col: str = "total_amount") -> DataFrame: """ 1) Strip thousand separators and currency symbols. 2) Forbid scientific notation; mark as invalid if contains e/E. 3) Enforce numeric pattern ^-?\d+(\.\d+)?$ 4) Cast to DOUBLE, round HALF_UP to 2 decimals, cast to DECIMAL(18,2). Returns df with: - total_amount_clean_str - total_amount_num_dec (DECIMAL(18,2)) normalized - flags: invalid_number, sci_notation_found """ # capture raw string for diagnostics df = df_raw.withColumn("total_amount_raw_str", F.col(total_col).cast("string"))
# detect scientific notation beforehand
df = df.withColumn(
"sci_notation_found",
F.when(F.col("total_amount_raw_str").rlike("(?i)[+-]?[0-9]*\\.?[0-9]+[eE][+-]?[0-9]+"), F.lit(True)).otherwise(F.lit(False)),
)
# remove common currency symbols and thousand separators, spaces
# keep digits, dot, minus
df = df.withColumn(
"total_amount_clean_str",
F.regexp_replace(F.col("total_amount_raw_str"), r"[,\s¥¥$€£]", "")
).withColumn(
"total_amount_clean_str",
F.regexp_replace(F.col("total_amount_clean_str"), r"[^0-9\.\-]", "")
)
# validate simple decimal pattern (no scientific notation)
numeric_pattern = r"^-?\d+(\.\d+)?$"
df = df.withColumn(
"numeric_like",
F.when(F.col("total_amount_clean_str").rlike(numeric_pattern), F.lit(True)).otherwise(F.lit(False))
)
# mark invalid numbers (include sci notation or bad pattern)
df = df.withColumn(
"invalid_number",
(~F.col("numeric_like")) | F.col("sci_notation_found")
)
# cast to double and round HALF_UP using round(), then cast to DECIMAL(18,2)
# invalid_number will yield null after cast; keep for error reporting
df = df.withColumn(
"total_amount_num_dbl",
F.when(~F.col("invalid_number"), F.col("total_amount_clean_str").cast("double")).otherwise(F.lit(None).cast("double"))
)
df = df.withColumn(
"total_amount_num_dec",
F.when(F.col("total_amount_num_dbl").isNotNull(), F.round(F.col("total_amount_num_dbl"), 2).cast(DECIMAL_TYPE)).otherwise(F.lit(None).cast(DECIMAL_TYPE))
)
return df
def validate_total_amount(df_raw: DataFrame,
total_col: str = "total_amount",
sample_fraction: float = 0.1,
seed: int = 42) -> Dict[str, DataFrame]:
"""
Apply validation rules on fact_order_item.total_amount.
Returns dict with:
- valid_df: records passing all checks, total_amount normalized to DECIMAL(18,2) in column total_amount_std
- errors_df: row-level error details (multiple rows per input possible if multiple failures)
- consistency_conflicts_df: group-level inconsistencies across partitions/batches
- diff_report_df: summary stats for recomputation differences
- dist_stats_df: distribution stats for spike detection
"""
df = normalize_total_amount(df_raw, total_col)
# Non-empty check
df = df.withColumn("is_empty_str", F.trim(F.col("total_amount_raw_str")) == "")
# Non-null check
df = df.withColumn("is_null", F.col(total_col).isNull())
# Type check failures
type_fail = (F.col("invalid_number") | F.col("total_amount_num_dec").isNull())
# Range check: 0 <= amount <= 1_000_000
df = df.withColumn("range_underflow", F.col("total_amount_num_dec") < F.lit(0).cast(DECIMAL_TYPE))
df = df.withColumn("range_overflow", F.col("total_amount_num_dec") > F.lit(AMOUNT_UPPER_LIMIT).cast(DECIMAL_TYPE))
# Recompute expected amount
q = F.col("quantity").cast("double") if "quantity" in df.columns else F.lit(None).cast("double")
p = F.col("unit_price").cast("double") if "unit_price" in df.columns else F.lit(None).cast("double")
has_disc = "discount_amount" in df.columns
has_tax = "tax_amount" in df.columns
d = F.coalesce(F.col("discount_amount").cast("double"), F.lit(0.0)) if has_disc else F.lit(0.0)
t = F.coalesce(F.col("tax_amount").cast("double"), F.lit(0.0)) if has_tax else F.lit(0.0)
base = q * p
expected_expr = F.when(
((F.col("discount_amount").isNull()) & (F.col("tax_amount").isNull())) if (has_disc and has_tax) else F.lit(False),
base
).otherwise(base - d + t)
# Only compute when q and p are present
expected_expr = F.when(q.isNotNull() & p.isNotNull(), expected_expr).otherwise(F.lit(None).cast("double"))
df = df.withColumn("expected_total_dbl", expected_expr)
df = df.withColumn("expected_total_dec", F.when(F.col("expected_total_dbl").isNotNull(),
F.round(F.col("expected_total_dbl"), 2).cast(DECIMAL_TYPE)).otherwise(F.lit(None).cast(DECIMAL_TYPE)))
# Absolute difference in double for tolerance comparison
df = df.withColumn("recalc_abs_diff",
F.when(F.col("expected_total_dec").isNotNull() & F.col("total_amount_num_dec").isNotNull(),
F.abs(F.col("total_amount_num_dec").cast("double") - F.col("expected_total_dec").cast("double")))
.otherwise(F.lit(None).cast("double")))
df = df.withColumn("recalc_mismatch",
F.when(F.col("recalc_abs_diff").isNotNull(), F.col("recalc_abs_diff") > F.lit(AMOUNT_TOLERANCE))
.otherwise(F.lit(False)))
# Sample-based verification (reduces cost in batch). Records not in sample keep `recalc_mismatch` as computed (can be skipped if None).
df_sample = df.sample(withReplacement=False, fraction=sample_fraction, seed=seed)
df = (df.alias("all")
.join(df_sample.select("order_id").alias("s"), on=["order_id"], how="left") # sample join key by order for better coverage
.withColumn("in_sample", F.col("s.order_id").isNotNull())
.drop("s.order_id"))
# For rows outside sample, we can optionally null out mismatch to not flag them in batch; here we keep full computation
# If you prefer sampling strictly, uncomment the next line:
# df = df.withColumn("recalc_mismatch", F.when(F.col("in_sample"), F.col("recalc_mismatch")).otherwise(F.lit(False)))
# Consistency check across batches/partitions for same natural key
# Define natural key columns
key_cols = []
for candidates in [["order_id", "order_item_id"], ["order_id", "item_id"], ["order_id", "sku_id"]]:
if has_cols(df, candidates):
key_cols = candidates
break
if not key_cols:
key_cols = ["order_id"] # fallback if item-level key not present
# Use available batch/partition identifiers for diagnostics
part_cols = [c for c in ["batch_id", "ingest_date", "etl_date", "dt", "partition_date"] if c in df.columns]
# Build error rows
error_rows = []
# Non-null and empty checks
nonnull_fail_df = df.where(F.col("is_null") | F.col("is_empty_str")) \
.select(*(key_cols + part_cols),
F.col("total_amount_raw_str").alias("value"),
F.lit("NON_NULL").alias("error_type"),
F.when(F.col("is_null"), F.lit("total_amount is NULL"))
.when(F.col("is_empty_str"), F.lit("total_amount is empty string"))
.otherwise(F.lit("unknown")).alias("error_reason"))
error_rows.append(nonnull_fail_df)
# Type failures (invalid_number or cast failed)
type_fail_df = df.where(type_fail) \
.select(*(key_cols + part_cols),
F.col("total_amount_raw_str").alias("value"),
F.lit("TYPE_MISMATCH").alias("error_type"),
F.when(F.col("sci_notation_found"), F.lit("scientific notation forbidden"))
.when(F.col("invalid_number"), F.lit("non-numeric after cleaning"))
.otherwise(F.lit("cast to DECIMAL(18,2) failed")).alias("error_reason"))
error_rows.append(type_fail_df)
# Range failures
range_fail_df = df.where(F.col("range_underflow") | F.col("range_overflow")) \
.select(*(key_cols + part_cols),
F.col("total_amount_raw_str").alias("value"),
F.lit("RANGE").alias("error_type"),
F.when(F.col("range_underflow"), F.lit("amount < 0 not allowed"))
.when(F.col("range_overflow"), F.lit("amount > 1,000,000 requires review"))
.otherwise(F.lit("range violation")).alias("error_reason"))
error_rows.append(range_fail_df)
# Recalculation mismatch (only when we have inputs)
recalc_fail_df = df.where(F.col("recalc_mismatch") & F.col("expected_total_dec").isNotNull()) \
.select(*(key_cols + part_cols),
F.col("total_amount_num_dec").cast("string").alias("value"),
F.lit("RECALC_MISMATCH").alias("error_type"),
F.format_string("expected=%s, actual=%s, diff=%.4f",
F.col("expected_total_dec").cast("string"),
F.col("total_amount_num_dec").cast("string"),
F.col("recalc_abs_diff")).alias("error_reason"))
error_rows.append(recalc_fail_df)
errors_df = None
if error_rows:
errors_df = error_rows[0]
for e in error_rows[1:]:
errors_df = errors_df.unionByName(e, allowMissingColumns=True)
# Consistency conflicts: same key has multiple distinct amounts across partitions/batches
# We check distinct DECIMAL(18,2) representations
cons_group = (df.where(F.col("total_amount_num_dec").isNotNull())
.groupBy(*key_cols)
.agg(F.countDistinct("total_amount_num_dec").alias("distinct_amounts"),
F.collect_set("total_amount_num_dec").alias("amount_set"),
*([F.collect_set(c).alias(f"{c}_set") for c in part_cols] if part_cols else []))
.where(F.col("distinct_amounts") > 1))
consistency_conflicts_df = cons_group
# Distribution stats for spike detection
# Use approx_percentile; if not supported, fallback to approxQuantile via action
quantiles = [0.5, 0.9, 0.95, 0.99]
dist_stats_df = (df.where(F.col("total_amount_num_dec").isNotNull())
.agg(F.count("*").alias("n"),
F.min("total_amount_num_dec").alias("min"),
F.expr("percentile_approx(total_amount_num_dec, 0.5)").alias("p50"),
F.expr("percentile_approx(total_amount_num_dec, 0.9)").alias("p90"),
F.expr("percentile_approx(total_amount_num_dec, 0.95)").alias("p95"),
F.expr("percentile_approx(total_amount_num_dec, 0.99)").alias("p99"),
F.max("total_amount_num_dec").alias("max")))
# Diff report (summary) for recomputation mismatches within sample scope
diff_report_df = (df.where(F.col("expected_total_dec").isNotNull())
.agg(
F.sum(F.when(F.col("recalc_mismatch"), 1).otherwise(0)).alias("mismatch_cnt"),
F.count("*").alias("checked_cnt"),
F.avg(F.when(F.col("recalc_abs_diff").isNotNull(), F.col("recalc_abs_diff"))).alias("avg_abs_diff"),
F.max("recalc_abs_diff").alias("max_abs_diff")
))
# Valid records: pass all checks
valid_df = (df.where(
(~F.col("is_null")) &
(~F.col("is_empty_str")) &
(~type_fail) &
(~F.col("range_underflow")) &
(~F.col("range_overflow")) &
(~F.col("recalc_mismatch") | F.col("expected_total_dec").isNull()) # allow pass when expected not computable due to missing inputs
)
.withColumn("total_amount_std", F.col("total_amount_num_dec").cast(DECIMAL_TYPE))
# Optional: drop helper columns
.drop("total_amount_num_dbl", "total_amount_num_dec", "total_amount_clean_str",
"invalid_number", "numeric_like", "sci_notation_found",
"is_null", "is_empty_str", "range_underflow", "range_overflow",
"expected_total_dbl", "expected_total_dec", "recalc_abs_diff",
"recalc_mismatch", "in_sample", "total_amount_raw_str"))
return {
"valid_df": valid_df,
"errors_df": errors_df,
"consistency_conflicts_df": consistency_conflicts_df,
"diff_report_df": diff_report_df,
"dist_stats_df": dist_stats_df
}
// 验证对象:stg_event_log.event_date // 目标:统一事件日期口径(ISO 8601 yyyy-MM-dd),保证一致性与可追踪性。 // 数据验证类型覆盖:非空、格式校验、范围校验、数据类型匹配。 // 说明:下面每条规则以 Spark SQL 布尔谓词表示(为真视为通过)。带有可选依赖列(如 event_time、ingest_ts、dt、event_date_str)的规则仅在该列存在时执行。
object StgEventLogEventDateValidation {
// 最低基准日与允许的跨时区未来漂移 val MinDate = "2018-01-01" val AllowedFutureDriftDays = 1
// 规则结构定义(配置化表达) final case class ValidationRule( id: String, category: String, // 非空/格式校验/范围校验/数据类型匹配/依赖一致性 predicate: String, // Spark SQL 谓词(必须为 TRUE) failureMsg: String, // 失败说明 dependsOn: Seq[String] = Nil, // 必备列 optionalDependsOn: Seq[String] = Nil // 可选列(存在才校验) )
// 架构/元数据级校验(在作业启动时一次性检查) // 要求:stg_event_log.event_date 为 DATE 类型 // 伪表达(需在作业中以 schema 检查实现): // require(df.schema("event_date").dataType == org.apache.spark.sql.types.DateType, "event_date 必须为 DATE 类型") val schemaRules: Seq[ValidationRule] = Seq( ValidationRule( id = "TYPE_MATCH_001", category = "数据类型匹配", predicate = "/* metadata check: event_date MUST be DATE type */ TRUE", failureMsg = "event_date 非 DATE 类型(必须为 DATE)" ) )
// 行级校验规则(按记录评估) val rowRules: Seq[ValidationRule] = Seq(
// 非空
ValidationRule(
id = "NOT_NULL_001",
category = "非空",
predicate =
"""event_date IS NOT NULL""",
failureMsg = "event_date 为空(禁止 NULL)",
dependsOn = Seq("event_date")
),
ValidationRule(
id = "NOT_PLACEHOLDER_002",
category = "非空",
predicate =
"""date_format(event_date, 'yyyy-MM-dd') NOT IN ('0000-00-00', '9999-12-31')""",
failureMsg = "event_date 为占位日期(0000-00-00 或 9999-12-31 禁止)",
dependsOn = Seq("event_date")
),
// 格式校验(严格 yyyy-MM-dd,不带时间/时区;若存在源字符串列 event_date_str 则按字符串严格校验)
ValidationRule(
id = "FORMAT_STRICT_001",
category = "格式校验",
predicate =
// 当存在 event_date_str(源字符串):
// 1) 必须匹配 yyyy-MM-dd
// 2) 禁止包含时间、时区、斜杠、点号等
// 3) 解析值与落地 event_date 一致
// 若不存在 event_date_str,仅验证落地日期可被标准化回 yyyy-MM-dd(恒真,但保留占位)
"""CASE
WHEN event_date_str IS NOT NULL THEN
event_date_str RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'
AND NOT (event_date_str RLIKE '[/\\.]' OR event_date_str RLIKE '[Tt]' OR event_date_str RLIKE '(Z|\\+\\d{2}:?\\d{2})')
AND to_date(event_date_str, 'yyyy-MM-dd') IS NOT NULL
AND to_date(event_date_str, 'yyyy-MM-dd') = event_date
ELSE TRUE
END""",
failureMsg = "事件日期格式不合规(必须严格 yyyy-MM-dd;不得含时间/时区/斜杠/点号;解析后须与 event_date 一致)",
dependsOn = Seq("event_date"),
optionalDependsOn = Seq("event_date_str")
),
// 非法日期检测(如 2024-02-30),在 ETL 解析阶段应置为 NULL 并入问题队列
ValidationRule(
id = "ILLEGAL_DATE_002",
category = "格式校验",
predicate =
"""CASE
WHEN event_date_str IS NOT NULL THEN to_date(event_date_str, 'yyyy-MM-dd') IS NOT NULL
ELSE event_date IS NOT NULL
END""",
failureMsg = "无法解析为合法公历日期(如 2024-02-30 等)",
dependsOn = Seq("event_date"),
optionalDependsOn = Seq("event_date_str")
),
// 范围校验:2018-01-01 至当前日期(含);允许 +1 天漂移
ValidationRule(
id = "RANGE_001",
category = "范围校验",
predicate =
"""event_date >= to_date('2018-01-01') AND event_date <= date_add(current_date(), 1)""",
failureMsg = "事件日期不在允许范围(2018-01-01 至当前日期,未来最多允许 +1 天)",
dependsOn = Seq("event_date")
),
// 依赖一致性:event_time 存在时,to_date(event_time) 必须等于 event_date
ValidationRule(
id = "CONSISTENCY_TIME_001",
category = "依赖一致性",
predicate =
"""CASE WHEN event_time IS NOT NULL THEN to_date(event_time) = event_date ELSE TRUE END""",
failureMsg = "event_time 与 event_date 不一致(to_date(event_time) 应等于 event_date)",
dependsOn = Seq("event_date"),
optionalDependsOn = Seq("event_time")
),
// 依赖一致性:ingest_ts 不应早于 event_date
ValidationRule(
id = "CONSISTENCY_INGEST_002",
category = "依赖一致性",
predicate =
"""CASE WHEN ingest_ts IS NOT NULL THEN to_date(ingest_ts) >= event_date ELSE TRUE END""",
failureMsg = "ingest_ts 早于 event_date(不允许回溯入湖)",
dependsOn = Seq("event_date"),
optionalDependsOn = Seq("ingest_ts")
),
// 分区一致性:dt(分区字段,字符串)需与 event_date 一致(yyyy-MM-dd)
ValidationRule(
id = "PARTITION_MATCH_003",
category = "依赖一致性",
predicate =
"""CASE WHEN dt IS NOT NULL THEN dt = date_format(event_date, 'yyyy-MM-dd') ELSE TRUE END""",
failureMsg = "分区字段 dt 与 event_date 不一致(dt 应等于 date_format(event_date,'yyyy-MM-dd'))",
dependsOn = Seq("event_date"),
optionalDependsOn = Seq("dt")
),
// 本地化/异常输入检测(进入异常队列):中文日期与本地化月名、零前导等
ValidationRule(
id = "LOCALE_VARIANT_004",
category = "格式校验",
predicate =
"""CASE
WHEN event_date_str IS NOT NULL THEN
NOT (event_date_str RLIKE '[年月日]' OR
event_date_str RLIKE '(?i)Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Sept|Oct|Nov|Dec' OR
event_date_str RLIKE '[/\\.]' OR
event_date_str RLIKE '^\d{5,}-')
ELSE TRUE
END""",
failureMsg = "检测到本地化/变体日期格式(中文日期、英文月名、斜杠/点号或异常零前导年份)",
optionalDependsOn = Seq("event_date_str")
)
) }
// 异常处理与实施建议(流程性说明): // 1) 入湖层解析:对源字符串(若存在,如 event_date_str)应用严格日期解析 UDF: // - 仅接受 yyyy-MM-dd;清理并拒绝包含时间/时区/斜杠/点号/中文/英文月名的变体; // - 非法日期(如 2024-02-30)解析为 NULL;保留原始值以便审计; // - 对存在零前导、非法月份/日的样本直接入问题队列,附修复建议。 // 2) 一致性检查作业:在写入分区前执行 rowRules 全量校验;对失败记录输出到异常样本表(含 rule_id、failureMsg、原值、修复建议)。 // 3) 分区一致性:写前强制 dt = date_format(event_date,'yyyy-MM-dd');若冲突则拦截写入。 // 4) 审计追踪:保留 event_date 原始字符串(event_date_str)与解析结果(event_date),链路中记录解析/清洗 UDF 版本与运行批次。 // 5) 报表上线前:要求异常队列清空或签署豁免(明确数量、原因与治理计划)。
用一条高效提示词,自动生成可执行的数据验证规则,让每一列数据都有清晰边界与可追溯标准,帮助团队把“数据干净度”变成可复制的竞争力。
在导入、转换、落库环节快速生成列级校验规则,配套示例数据与说明,缩短上线周期并减少回滚。
搭建统一的字段校验模板,批量覆盖核心表,提升数据质量标准化,支持审计与走查。
为指标口径关键字段制定格式与范围限制,提前拦截脏数据,减少报表返工,加快周报月报交付。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期