热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
为指定数据表的特定列,生成专业、可执行的数据验证规则。适用于数据清洗、ETL管道构建、数据质量监控等场景,通过自定义验证类型与详细说明,确保数据的一致性与准确性,提升数据工程效率。
主题:fact_orders.order_date 日期格式校验规则与检查步骤(格式校验)
一、验证规则(严格执行)
二、检查流程与判定逻辑 步骤顺序(先 trim 再校验),并输出违规类型 violation_type:
三、统计输出(D)
四、异常处理与阻断策略
五、示例代码(可直接用于离线批处理)
A. PySpark 实现(推荐用于数据湖/仓常规批校验) 说明:
from pyspark.sql import functions as F, types as T
SOURCE_TABLE = "fact_orders" KEY_COL = "order_id" DATE_COL = "order_date" # 可能是 STRING/DATE/TIMESTAMP BAD_TABLE = "bad_records.fact_orders_order_date"
PROCESSING_DAY = F.to_date(F.lit(spark.conf.get("pipeline.run_date", "")), "yyyy-MM-dd")
PROCESSING_DAY = F.coalesce(PROCESSING_DAY, F.current_date()) BATCH_TS = F.current_timestamp()
df = spark.table(SOURCE_TABLE).select(KEY_COL, DATE_COL)
df1 = ( df .withColumn("order_date_raw", F.col(DATE_COL).cast("string")) .withColumn("order_date_trim", F.trim(F.col("order_date_raw"))) )
regex = r'^\d{4}-\d{2}-\d{2}$' placeholders = F.lower(F.col("order_date_trim")).isin("n/a", "na", "null", "none", "-", "—") is_blank = (F.col("order_date_trim").isNull()) | (F.col("order_date_trim") == "") regex_ok = F.col("order_date_trim").rlike(regex)
parsed_date = F.to_date(F.col("order_date_trim"), "yyyy-MM-dd") in_range = (parsed_date >= F.to_date(F.lit("2000-01-01"))) & (parsed_date <= F.to_date(F.lit("2099-12-31"))) not_future = parsed_date <= PROCESSING_DAY
violation_type = ( F.when(is_blank, F.lit("NULL_OR_BLANK")) .when(placeholders, F.lit("PLACEHOLDER")) .when(~regex_ok, F.lit("INVALID_FORMAT")) .when(parsed_date.isNull(), F.lit("INVALID_CALENDAR_DATE")) .when(~in_range, F.lit("OUT_OF_RANGE")) .when(~not_future, F.lit("FUTURE_DATE")) )
df2 = df1.withColumn("violation_type", violation_type)
bad_df = df2.where(F.col("violation_type").isNotNull())
.select(
F.col(KEY_COL).alias("order_id"),
F.col("order_date_raw"),
F.col("violation_type"),
BATCH_TS.alias("detected_at")
)
good_df = df2.where(F.col("violation_type").isNull())
total_rows = df2.count() bad_rows = bad_df.count() bad_ratio = (bad_rows / total_rows) if total_rows > 0 else 0.0
if bad_rows > 0: bad_df.write.mode("append").format("parquet").saveAsTable(BAD_TABLE)
samples_df = bad_df.limit(100)
print({ "table": SOURCE_TABLE, "column": DATE_COL, "total_rows": total_rows, "bad_rows": bad_rows, "bad_ratio": bad_ratio }) samples = samples_df.toPandas() print(samples)
if bad_ratio > 0.001: raise RuntimeError(f"[BLOCKED] {SOURCE_TABLE}.{DATE_COL} bad_ratio={bad_ratio:.6%} > 0.1%")
B. Spark SQL(仅 SQL 作业或调度中使用) -- 参数 -- set pipeline.run_date='2024-07-02';
WITH src AS ( SELECT order_id, CAST(order_date AS STRING) AS order_date_raw, TRIM(CAST(order_date AS STRING)) AS order_date_trim FROM fact_orders ), marked AS ( SELECT order_id, order_date_raw, order_date_trim, CASE WHEN order_date_trim IS NULL OR order_date_trim = '' THEN 'NULL_OR_BLANK' WHEN lower(order_date_trim) IN ('n/a','na','null','none','-','—') THEN 'PLACEHOLDER' WHEN order_date_trim !~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}$' THEN 'INVALID_FORMAT' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') IS NULL THEN 'INVALID_CALENDAR_DATE' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') < DATE '2000-01-01' OR TO_DATE(order_date_trim, 'yyyy-MM-dd') > DATE '2099-12-31' THEN 'OUT_OF_RANGE' WHEN TO_DATE(order_date_trim, 'yyyy-MM-dd') > COALESCE(TO_DATE('${pipeline.run_date}'), current_date()) THEN 'FUTURE_DATE' ELSE NULL END AS violation_type FROM src ), bad AS ( SELECT order_id, order_date_raw, violation_type, current_timestamp() AS detected_at FROM marked WHERE violation_type IS NOT NULL ), stat AS ( SELECT (SELECT COUNT() FROM marked) AS total_rows, (SELECT COUNT() FROM bad) AS bad_rows ) -- 1) 落地 bad_records INSERT INTO bad_records.fact_orders_order_date SELECT * FROM bad;
-- 2) 输出统计 SELECT total_rows, bad_rows, bad_rows / total_rows AS bad_ratio FROM stat;
-- 3) 样例(限 100) SELECT order_id, order_date_raw, violation_type FROM bad LIMIT 100;
-- 4) 阻断(若调度系统支持基于查询结果的阈值判断,可用上面的 bad_ratio 触发失败)
说明:正则操作符在 Spark SQL 可用 RLIKE;上例使用 !~ 可能在部分方言不可用,若不支持请改为 NOT (order_date_trim RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$')。
C. 违规落地表 DDL(Hive/Spark 兼容) CREATE SCHEMA IF NOT EXISTS bad_records;
CREATE TABLE IF NOT EXISTS bad_records.fact_orders_order_date ( order_id STRING, order_date_raw STRING, violation_type STRING, detected_at TIMESTAMP ) USING PARQUET;
-- 如需分区(建议按检测日期) -- PARTITIONED BY (dt STRING) 并在写入时加 dt=DATE_FORMAT(CURRENT_TIMESTAMP(), 'yyyy-MM-dd')
D. 存储层 CHECK 约束(若引擎支持) 目标:尽量在存储层阻止违规数据入表(动态“不得晚于 T 日”不适用于静态约束)。
Delta Lake(Databricks/开源 Delta,CHECK 在写时强制执行) -- order_date 为 STRING 列时,格式 + 静态范围 ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK (order_date RLIKE '^[0-9]{4}-[0-9]{2}-[0-9]{2}$');
ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (to_date(order_date, 'yyyy-MM-dd') >= DATE '2000-01-01' AND to_date(order_date, 'yyyy-MM-dd') <= DATE '2099-12-31');
-- 若将字段类型定为 DATE,更稳妥(避免格式问题),则仅需范围约束 -- ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (order_date BETWEEN DATE '2000-01-01' AND DATE '2099-12-31');
PostgreSQL(约束强制,推荐把字段类型定为 DATE) -- 若列类型为 DATE(推荐) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK (order_date >= DATE '2000-01-01' AND order_date <= DATE '2099-12-31');
-- 若列为 TEXT,需同时校验格式与可转日期(CAST 失败会阻止写入) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK (order_date ~ '^[0-9]{4}-[0-9]{2}-[0-9]{2}$'); ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_cast_and_range CHECK (order_date::date >= DATE '2000-01-01' AND order_date::date <= DATE '2099-12-31');
ClickHouse(CONSTRAINT 在插入时强制) ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_format CHECK match(order_date, '^[0-9]{4}-[0-9]{2}-[0-9]{2}$');
ALTER TABLE fact_orders ADD CONSTRAINT chk_order_date_range CHECK parseDateTimeBestEffortOrNull(order_date) >= toDate('2000-01-01') AND parseDateTimeBestEffortOrNull(order_date) <= toDate('2099-12-31');
备注:
六、实施与运维建议
以下为表 fact_orders 的列 order_status 的“枚举值校验”可执行规则与检查步骤(含示例代码)。目标是确保该字段仅取自统一状态集,规范大小写与空白,避免未知占位与同义异写,支撑履约、退款与收入确认的一致性。
一、校验规则(可操作定义)
二、检查要点与统计输出
三、异常处理与自动修复策略
四、实施建议
五、示例代码
MERGE INTO dim_order_status d USING ( SELECT 'PENDING' AS c, '待支付/待处理' AS s UNION ALL SELECT 'PAID', '已支付' UNION ALL SELECT 'SHIPPED', '已发货' UNION ALL SELECT 'DELIVERED', '已送达/签收' UNION ALL SELECT 'CANCELLED', '已取消' UNION ALL SELECT 'REFUNDED', '已退款' UNION ALL SELECT 'FAILED_PAYMENT', '支付失败' ) src ON d.status_code = src.c WHEN NOT MATCHED THEN INSERT (status_code, status_desc) VALUES (src.c, src.s);
-- 可选:同义词/常见拼写映射表(优先精确/正则,避免过度泛化) CREATE TABLE IF NOT EXISTS ref_order_status_synonyms ( pattern VARCHAR(64), -- 匹配模式(存放大写+去空格的形态) mapped_to VARCHAR(32), -- 目标标准值 match_type VARCHAR(16) -- EXACT 或 REGEX );
INSERT INTO ref_order_status_synonyms (pattern, mapped_to, match_type) VALUES ('PAID', 'PAID', 'EXACT'), ('PENDING', 'PENDING', 'EXACT'), ('SHIPPED', 'SHIPPED', 'EXACT'), ('DELIVERED', 'DELIVERED', 'EXACT'), ('CANCELLED', 'CANCELLED', 'EXACT'), ('REFUNDED', 'REFUNDED', 'EXACT'), ('FAILEDPAYMENT', 'FAILED_PAYMENT', 'EXACT'), -- 常见变体(存放去空格+大写后形态) ('PAID', 'PAID', 'EXACT'), ('PAYED', 'PAID', 'EXACT'), ('SHIPED', 'SHIPPED', 'EXACT'), ('SHIPPEDD', 'SHIPPED', 'EXACT'), ('CANCELED', 'CANCELLED', 'EXACT'), ('FAILED_PAYMENT', 'FAILED_PAYMENT', 'EXACT');
-- 有效值分布 SELECT normalized_status AS status_code, COUNT(*) AS cnt FROM chk WHERE violation_reason IS NULL GROUP BY normalized_status ORDER BY cnt DESC;
-- 未知/违规 TopN SELECT COALESCE(upper_status, '(NULL)') AS raw_upper_status, COUNT(*) AS cnt, MIN(violation_reason) AS reason FROM chk WHERE violation_reason IS NOT NULL GROUP BY COALESCE(upper_status, '(NULL)') ORDER BY cnt DESC FETCH FIRST 50 ROWS WITH TIES;
-- 插入违规记录(recommended_mapping:若 enum 中不存在且同义词表命中则给出;否则 NULL) INSERT INTO bad_records.fact_orders_order_status SELECT c.order_id, c.order_status AS raw_order_status, c.normalized_status, c.violation_reason, CASE WHEN c.violation_reason = 'NOT_IN_ENUM' THEN rs2.mapped_to ELSE NULL END AS recommended_mapping, c.source_system, :batch_id AS etl_batch_id, CURRENT_TIMESTAMP AS processed_at FROM chk c LEFT JOIN ref_order_status_synonyms rs2 ON c.compact_status = rs2.pattern AND rs2.match_type = 'EXACT' WHERE c.violation_reason IS NOT NULL;
-- 无法安全修复的记录保留在 bad_records,待业务确认后回填
ALLOWED = {"PENDING","PAID","SHIPPED","DELIVERED","CANCELLED","REFUNDED","FAILED_PAYMENT"} PLACEHOLDERS = {"NULL","N/A","NA","NONE","-","UNKNOWN"}
COMMON_MAP = { "PAYED": "PAID", "SHIPED": "SHIPPED", "SHIPPEDD": "SHIPPED", "CANCELED": "CANCELLED", "FAILED PAYMENT": "FAILED_PAYMENT", "FAILEDPAYMENT": "FAILED_PAYMENT", }
@F.udf("string") def normalize_status(s): if s is None: return None t = s.strip() if t == "": return None up = t.upper() if up in PLACEHOLDERS: return None compact = up.replace(" ", "") if compact in ALLOWED: return compact if compact in COMMON_MAP: return COMMON_MAP[compact] # 保守策略:不做模糊修复,避免误判;如需,可增加 levenshtein <= 1 的映射白名单 return None
def with_status_validation(df): df1 = df.withColumn("normalized_status", normalize_status(F.col("order_status"))) df_valid = df1.filter(F.col("normalized_status").isin(list(ALLOWED))) df_invalid = df1.filter(~F.col("normalized_status").isin(list(ALLOWED))) return df_valid, df_invalid
df_valid, df_invalid = with_status_validation(df_orders)
df_stats = df1.groupBy("normalized_status").count()
df_invalid
.withColumn("violation_reason",
F.when(F.col("normalized_status").isNull(), F.lit("NULL_OR_PLACEHOLDER"))
.otherwise(F.lit("NOT_IN_ENUM")))
.withColumn("etl_batch_id", F.lit(batch_id))
.withColumn("processed_at", F.current_timestamp())
.write.mode("append").insertInto("bad_records.fact_orders_order_status")
df_valid.select( "order_id", F.col("normalized_status").alias("order_status"), "source_system", # 其他业务列... ).write.mode("append").insertInto("fact_orders")
-- 若需标准化,可在 staging 层构建视图:upper(trim(order_status)) as order_status,然后在该视图上做 accepted_values
六、部署与运行顺序建议
外键校验对象
一、校验规则(可执行约束)
二、检查步骤(分区级或时间窗口级执行)
三、统计口径与输出样例
四、异常处理与运维策略
五、配置项(建议以集中配置管理)
六、实施与治理建议
七、坏数据表结构建议(字段清单)
通过上述规则与步骤,可实现对 fact_orders.customer_id 的外键完整性、SCD2 时效一致性及可选活跃状态的一致校验,同时提供可观测的质量指标、明确的异常处置路径与落地治理保障。
用一条高效提示词,自动生成可执行的数据验证规则,让每一列数据都有清晰边界与可追溯标准,帮助团队把“数据干净度”变成可复制的竞争力。
在导入、转换、落库环节快速生成列级校验规则,配套示例数据与说明,缩短上线周期并减少回滚。
搭建统一的字段校验模板,批量覆盖核心表,提升数据质量标准化,支持审计与走查。
为指标口径关键字段制定格式与范围限制,提前拦截脏数据,减少报表返工,加快周报月报交付。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期