¥
立即购买

列数据规范化

366 浏览
36 试用
10 购买
Nov 24, 2025更新

为指定表格列生成专业的数据规范化规则,提供技术性解决方案,确保数据预处理、清洗和管道构建过程的准确性与一致性,提升数据质量和数据工程效率。

""" 规则版本: name_normalization.v1

目标/Goal:

  • 将 crm_contacts.full_name 统一为可比的规范形态,支持中英文团队协作。
  • Normalize crm_contacts.full_name into a comparable canonical form for bilingual (Chinese-English) collaboration.

规则/Rules:

  1. 预清洗 Pre-cleaning:
    • 去除首尾空格,合并连续空格为1个。
    • 清除 HTML 标签、控制字符(Cc)、Emoji。
    • 记录原始数字比例;若数字比例>20%,标记为 invalid:numeric_ratio 并返回 NULL。
    • 移除阿拉伯数字与尾随标点(.,;:_-);保留合法中间分隔符(-、·、’)。
  2. 大小写统一 Case normalization:
    • 对拉丁字符采用 Title Case;保留合法撇号与连字符(如 McDonald、O’Connor、Anna-Maria)。
    • 非拉丁/中文保持原文,仅统一冗余空格与符号。
  3. 符号标准 Symbol standardization:
    • 统一全角/半角、引号和连字符:将各类撇号标准化为 ’ (U+2019),连字符统一为 - (U+002D),保留中间分隔符(-、·、’)。
  4. 文化姓名处理 Cultural handling:
    • 中文名不拆分、不重排(例:王小明)。
    • 西文名按 Firstname Lastname,保留中间名缩写与敬称位置。
  5. 去重 Deduplication:
    • 以规范化后字符串的 SHA-256 为键分组。
    • 保留最新记录(按 updated_at 最大),汇总来源ID与出现频次。
  6. 缺失值 Missing values:
    • 空字符串、只含空白或清洗后为空的记为 NULL。
    • 在 audit 列写入原因(empty/invalid),可附加 notes。
  7. 白名单与例外 Whitelist & exceptions:
    • 保留合法前/后缀(如 Dr., Ms., PhD),在 lookup_names_titles 字典表维护与标准化。
  8. 输出 Output:
    • 生成中英文规则说明与 Python 示例代码;代码函数化、幂等、可复用,可在 Airflow/Prefect 任务中调用。
  9. 校验 Validation:
    • 规范化后长度需在 1–120 字符;超长截断为 120,并记录 audit 原因 truncated 与 notes。
  10. 版本化 Versioning:
  • 规则版本固定为 name_normalization.v1,后续追加不破坏兼容。

Examples:

  • ' ms. anna o’connor ' -> 'Ms. Anna O’Connor'
  • '张三' -> '张三' """

from future import annotations

import re import hashlib import unicodedata from datetime import datetime from typing import Dict, List, Optional, Tuple, Any

RULE_VERSION = "name_normalization.v1"

Title whitelist/lookup dictionary table (可扩展/Extensible)

lookup_names_titles: Dict[str, Dict[str, str]] = { "prefix": { # English "mr": "Mr.", "mrs": "Mrs.", "ms": "Ms.", "miss": "Ms.", "dr": "Dr.", "prof": "Prof.", "sir": "Sir", "madam": "Ms.", "mx": "Mx.", # Chinese to English mappings (optional normalization) "先生": "Mr.", "女士": "Ms.", "小姐": "Ms.", "博士": "Dr.", "教授": "Prof.", }, "suffix": { # English "phd": "PhD", "md": "MD", "dds": "DDS", "dvm": "DVM", "jd": "JD", "mba": "MBA", "jr": "Jr.", "sr": "Sr.", "iii": "III", "iv": "IV", # Chinese degrees could be mapped if desired; left empty to avoid lossy conversion }, }

Emoji ranges for broad removal (BMP + supplementary planes)

_EMOJI_PATTERN = re.compile( "[" + "\U0001F600-\U0001F64F" + # Emoticons "\U0001F300-\U0001F5FF" + # Misc Symbols and Pictographs "\U0001F680-\U0001F6FF" + # Transport & Map "\U0001F1E6-\U0001F1FF" + # Regional Indicators "\U0001F900-\U0001F9FF" + # Supplemental Symbols and Pictographs "\U0001FA70-\U0001FAFF" + # Symbols & Pictographs Extended-A "\U00002700-\U000027BF" + # Dingbats "\U00002600-\U000026FF" + # Misc symbols "]", flags=re.UNICODE )

_HTML_PATTERN = re.compile(r"<[^>]*>") _MULTI_SPACE_PATTERN = re.compile(r"\s+")

Allowed middle separators inside names

_ALLOWED_SEPARATORS = {"-", "·", "’"} # hyphen, middle dot, right single quotation mark

Trailing punctuation to strip from end of string

TRAILING_PUNCT_CHARS = ".,;:-"

def _strip_html(s: str) -> str: return _HTML_PATTERN.sub("", s)

def _remove_emoji(s: str) -> str: return _EMOJI_PATTERN.sub("", s)

def _remove_control_chars(s: str) -> str: # Remove Unicode category Cc (control chars) return "".join(ch for ch in s if unicodedata.category(ch) != "Cc")

def _normalize_width_and_symbols(s: str) -> str: # Normalize width (fullwidth -> halfwidth) and standardize symbols s = unicodedata.normalize("NFKC", s) # Standardize apostrophes to U+2019 s = s.replace("\u2018", "’").replace("\u2019", "’").replace("'", "’").replace("`", "’") # Standardize hyphens to ASCII hyphen-minus s = s.replace("\u2013", "-").replace("\u2014", "-").replace("\u2212", "-").replace("-", "-") # Leave middle dot as is return s

def _strip_trailing_punct(s: str) -> str: return s.rstrip(_TRAILING_PUNCT_CHARS)

def _collapse_spaces(s: str) -> str: s = s.strip() s = _MULTI_SPACE_PATTERN.sub(" ", s) return s

def _digit_ratio(s: str) -> float: if not s: return 0.0 total = sum(1 for ch in s if not ch.isspace()) digits = sum(1 for ch in s if ch.isdigit()) return digits / total if total > 0 else 0.0

def _is_latin_token(token: str) -> bool: # Consider a token Latin if all letters are from Latin scripts or separators/punct for ch in token: if ch.isalpha(): try: name = unicodedata.name(ch) except ValueError: return False if "LATIN" not in name: return False elif ch in _ALLOWED_SEPARATORS or ch in {".", " "}: continue else: # other characters like Chinese/Cyrillic return False return True

def _titlecase_latin_token(token: str) -> str: # Break on allowed separators and apply casing per segment def tc_segment(seg: str) -> str: if not seg: return seg lower = seg.lower() # Preserve Mc/Mac style (basic heuristics) if lower.startswith("mc") and len(lower) > 2: return "Mc" + lower[2:3].upper() + lower[3:] # Keep single-letter initials uppercase (e.g., "j." -> "J.") if len(seg) == 1: return seg.upper() return lower[0].upper() + lower[1:] # Tokens can have periods (e.g., "dr."), leave them for title mapping step # Split by apostrophe and hyphen to title-case each segment parts_apostrophe = token.split("’") parts_apostrophe = [subpart for subpart in parts_apostrophe] for i, ap in enumerate(parts_apostrophe): subparts = ap.split("-") subparts = [tc_segment(sp) for sp in subparts] parts_apostrophe[i] = "-".join(subparts) return "’".join(parts_apostrophe)

def _standardize_prefix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: # Normalize token for matching (strip trailing dot) candidate = token.replace(".", "") canonical = title_lookup.get("prefix", {}).get(candidate.lower()) if canonical: return canonical return None

def _standardize_suffix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: candidate = token.replace(".", "") canonical = title_lookup.get("suffix", {}).get(candidate.lower()) if canonical: return canonical return None

def _process_tokens(tokens: List[str], title_lookup: Dict[str, Dict[str, str]]) -> List[str]: if not tokens: return tokens

# Handle prefix at first token
first = tokens[0]
pref = _standardize_prefix(first, title_lookup)
if pref:
    tokens[0] = pref
    start_idx = 1
else:
    start_idx = 0

# Handle suffix at last token
last = tokens[-1]
suff = _standardize_suffix(last, title_lookup)
if suff:
    tokens[-1] = suff
    end_idx = len(tokens) - 1
else:
    end_idx = len(tokens)

# Title-case middle tokens that are Latin
for i in range(start_idx, end_idx):
    tok = tokens[i]
    if _is_latin_token(tok):
        tokens[i] = _titlecase_latin_token(tok)
    else:
        # Non-Latin: leave as-is (e.g., Chinese), only width/symbol normalization already applied
        tokens[i] = tok
return tokens

def _remove_digits(s: str) -> str: return re.sub(r"\d+", "", s)

def _normalize_name_string(raw: str, title_lookup: Dict[str, Dict[str, str]]) -> Tuple[Optional[str], Dict[str, Any]]: """ Returns (normalized_name or None, audit_info) audit_info contains: reason, notes """ audit = {"reason": None, "notes": {}}

if raw is None:
    audit["reason"] = "empty"
    return None, audit

original = raw

# Numeric ratio check on original text (excluding spaces)
ratio = _digit_ratio(original)
if ratio > 0.20:
    audit["reason"] = "invalid:numeric_ratio"
    audit["notes"]["digit_ratio"] = ratio
    return None, audit

# Pre-cleaning
s = _strip_html(original)
s = _remove_control_chars(s)
s = _remove_emoji(s)
s = _normalize_width_and_symbols(s)
s = _collapse_spaces(s)

# Remove digits (Arabic numerals) per rule
s = _remove_digits(s)

# Strip trailing punctuation
s = _strip_trailing_punct(s)

# Space collapse again if removal created double spaces
s = _collapse_spaces(s)

# Missing value check
if not s or s.strip() == "":
    audit["reason"] = "empty"
    return None, audit

# Tokenization by spaces for prefix/suffix handling and Latin title-case
tokens = s.split(" ")

# Standardize titles and apply Latin title-case
tokens = _process_tokens(tokens, title_lookup)

normalized = " ".join(tokens)

# Length validation
nlen = len(normalized)
if nlen == 0:
    audit["reason"] = "empty"
    return None, audit
if nlen > 120:
    normalized = normalized[:120]
    audit["reason"] = "truncated"
    audit["notes"]["original_length"] = nlen
    audit["notes"]["truncated_to"] = 120
else:
    audit["reason"] = "ok"

return normalized, audit

def _sha256_hex(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest()

def normalize_full_name( name: Optional[str], *, title_lookup: Dict[str, Dict[str, str]] = lookup_names_titles, ) -> Dict[str, Any]: """ Normalize a full name string according to name_normalization.v1.

Returns a dict:
{
    "normalized_full_name": Optional[str],
    "full_name_hash": Optional[str],  # SHA-256 of normalized string
    "audit_reason": str,              # "ok", "empty", "invalid:numeric_ratio", "truncated"
    "audit_notes": Dict[str, Any],
    "version": RULE_VERSION,
}

Idempotent: applying the function multiple times yields the same result.
"""
normalized, audit = _normalize_name_string(name, title_lookup)
result = {
    "normalized_full_name": normalized,
    "full_name_hash": _sha256_hex(normalized) if normalized else None,
    "audit_reason": audit["reason"],
    "audit_notes": audit.get("notes", {}),
    "version": RULE_VERSION,
}
return result

def _parse_datetime(dt: Any) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): return dt # Try ISO 8601 try: # Support 'Z' suffix if isinstance(dt, str): if dt.endswith("Z"): dt = dt[:-1] return datetime.fromisoformat(dt) except Exception: return None return None

def deduplicate_contacts(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Deduplicate contacts by normalized_full_name SHA-256 hash.

Input records: list of dicts, each containing at least:
    {
        "id": Any,
        "full_name": str,
        "source_id": Any,
        "updated_at": datetime or ISO8601 str,
        ... (other fields)
    }

Process:
- Normalize full_name (apply rules).
- Group by normalized_full_name hash.
- Keep the latest record (max updated_at) per group.
- Summarize source_ids (unique) and occurrences (count).
- Records with NULL normalized_full_name are excluded from grouping and returned individually with audit.

Returns list of dicts:
    {
        "id": Any,                           # id of retained (latest) record
        "normalized_full_name": Optional[str],
        "full_name_hash": Optional[str],
        "audit_reason": str,
        "audit_notes": Dict[str, Any],
        "version": RULE_VERSION,
        "source_ids": List[Any],             # aggregated for grouped records
        "occurrences": int,                  # count in group
        "representative_record": Dict[str, Any],  # latest record snapshot
    }
"""
groups: Dict[str, Dict[str, Any]] = {}
invalids: List[Dict[str, Any]] = []

for rec in records:
    norm = normalize_full_name(rec.get("full_name"))
    updated = _parse_datetime(rec.get("updated_at"))

    enriched = {
        "id": rec.get("id"),
        "normalized_full_name": norm["normalized_full_name"],
        "full_name_hash": norm["full_name_hash"],
        "audit_reason": norm["audit_reason"],
        "audit_notes": norm["audit_notes"],
        "version": RULE_VERSION,
        "source_ids": [rec.get("source_id")],
        "occurrences": 1,
        "representative_record": rec,
        "_updated_at": updated or datetime.min,
    }

    if norm["normalized_full_name"] is None or norm["full_name_hash"] is None:
        # keep as standalone invalid/empty
        invalids.append(enriched)
        continue

    key = norm["full_name_hash"]
    if key not in groups:
        groups[key] = enriched
    else:
        grp = groups[key]
        # Aggregate source_ids
        grp["source_ids"].append(rec.get("source_id"))
        grp["occurrences"] += 1
        # Choose latest record
        cur_dt = grp["_updated_at"]
        new_dt = enriched["_updated_at"]
        if new_dt and new_dt > cur_dt:
            grp["id"] = enriched["id"]
            grp["representative_record"] = rec
            grp["_updated_at"] = new_dt
        # Audit reason: if any member truncated, keep "ok"/"truncated" precedence
        # Prefer "ok" over "truncated" only if representative not truncated; else keep truncated
        # If any invalid slipped here (shouldn't), ignore since norm exists
        # Dedup group keeps normalized_full_name from representative
        groups[key] = grp

# Finalize output: dedup source_ids unique, remove internal fields
output: List[Dict[str, Any]] = []
for g in groups.values():
    g["source_ids"] = [sid for sid in set(g["source_ids"]) if sid is not None]
    g.pop("_updated_at", None)
    output.append(g)

# Include invalids as independent (no grouping)
for inv in invalids:
    inv["source_ids"] = [sid for sid in set(inv["source_ids"]) if sid is not None]
    inv.pop("_updated_at", None)
    output.append(inv)

return output

Example reusable task wrappers

def process_batch(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Batch processor suitable for Airflow/Prefect tasks: - Applies normalization and deduplication. - Returns normalized, deduplicated list with audit info. """ return deduplicate_contacts(records)

def airflow_task(**context) -> List[Dict[str, Any]]: """ Airflow-compatible callable. Expects 'records' in context['params'] or context['ti'].xcom_pull(). """ params = context.get("params", {}) if context else {} records = params.get("records") if records is None and context: try: records = context["ti"].xcom_pull(task_ids=params.get("upstream_task_id")) except Exception: records = [] records = records or [] return process_batch(records)

def prefect_task(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Prefect-compatible callable. """ return process_batch(records)

---- Demonstration ----

def _demo(): samples = [ {"id": 1, "full_name": " ms. anna o’connor ", "source_id": "crm_a", "updated_at": "2024-01-02T12:00:00Z"}, {"id": 2, "full_name": "Ms anna O'CONNOR", "source_id": "crm_b", "updated_at": "2024-03-05T09:30:00Z"}, {"id": 3, "full_name": "张三", "source_id": "crm_c", "updated_at": "2024-05-01T08:00:00Z"}, {"id": 4, "full_name": "Dr john mcDonald phd", "source_id": "crm_d", "updated_at": "2024-02-02T10:00:00Z"}, {"id": 5, "full_name": " ", "source_id": "crm_e", "updated_at": "2024-02-03T10:00:00Z"}, {"id": 6, "full_name": "Anna 1234 O’Connor 5678", "source_id": "crm_f", "updated_at": "2024-02-04T10:00:00Z"}, ] result = process_batch(samples) for r in result: print(r)

if name == "main": _demo()

-- date_normalization.v1 -- 目标/Goal: 统一 ecommerce_orders.order_date 为 ISO 8601 YYYY-MM-DD;标准化时区并处理异常值。 -- 输出包括:中文+英文说明(作为注释)与可执行 SQL 示例(清洗、解析、校验、转换、审计列写入)。 -- 说明: -- 1) 预清洗:去除多余空格与文本噪声(如 'Date:'、括号注释),统一分隔符(/, ., - -> -),移除序数后缀(st/nd/rd/th)。 -- 2) 解析输入格式:优先按地区配置解析(本示例:EU/DMY;可改为 MDY);支持: -- - 'YYYY-MM-DD'、'DD/MM/YYYY'、'MM-DD-YYYY'、'YYYY/MM/DD' -- - 英语自然语言 'Mar 4, 2025'/'March 4, 2025' -- - 含时间与时区 '2025-03-04T23:30:00-05:00'、'2025/03/04 23:30:00 PST' -- 安全回退:多模式 TRY 解析,均失败则置为 NULL。 -- 3) 时区统一:如字符串含时区,按其本地时间提取日期;无时区但含时间则按配置时区解释本地时间。 -- 凌晨跨日窗口:若本地时间 00:00–01:59,归前一日(即本地日期减 1)。 -- 最终输出仅存储日期部分(YYYY-MM-DD)。 -- 4) 非法日期校验:闰年、月/日范围由 TRY_ 函数保证;'0000-00-00'、'99/99/9999'、纯数字但不成合法日期置 NULL,记录 invalid_date。 -- 5) 缺失值:'N/A'、'unknown'、空字符串或清洗后为空 -> NULL,记录 missing;原始值写入 raw_order_date 以溯源。 -- 6) 审计列:raw_order_date、order_date_parse_format、order_date_src_timezone、order_date_invalid_reason、normalization_version。 -- 7) 可作为视图或用于 UPDATE/INSERT INTO 规范化落库。

-- 注:本示例使用 Snowflake SQL 方言;若使用其他引擎,请替换等效函数(TRY_TO_DATE/TRY_TO_TIMESTAMP[_TZ]、CONVERT_TIMEZONE、REGEXP_REPLACE 等)。

-- 可配置解析偏好与默认时区(优先地区/Preferred locale: DMY;默认时区/Default TZ: Europe/Berlin) -- 将 locale_preference 改为 'MDY' 可按美式优先;preferred_tz 改为业务地区。 WITH cfg AS ( SELECT 'DMY'::STRING AS locale_preference, -- 解析优先顺序/Parsing priority 'Europe/Berlin'::STRING AS preferred_tz, -- 缺省本地时区/Default local timezone 'date_normalization.v1'::STRING AS version ), src AS ( SELECT t., CAST(order_date AS STRING) AS raw_order_date -- 源字段快照/Raw snapshot FROM ecommerce_orders t ), clean AS ( SELECT s., -- 预清洗/Pre-cleaning -- 1) 去前后空格 Trim -- 2) 去前缀噪声 Remove 'Date:' 前缀 -- 3) 去括号/方括号注释 Remove bracketed annotations -- 4) 统一分隔符 . / -> - -- 5) 移除序数后缀 1st/2nd/3rd/4th 等 Remove ordinal suffixes -- 6) 多空格压缩 Single-space normalization NULLIF( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( TRIM(raw_order_date), '(?i)^\sdate\s:\s*', '' -- remove leading 'Date:' ), '\s*[([].?[)]]', '' -- strip (...) or [...] ), '[./]', '-' -- unify separators to '-' ), '(?<=\d)(?i)(st|nd|rd|th)\b', '' -- remove ordinal suffixes ), '\s+', ' ' -- collapse multiple spaces ), '' ) AS cleaned_order_date FROM src s ), flags AS ( SELECT c., -- 缺失值识别 Missing values CASE WHEN cleaned_order_date IS NULL THEN TRUE WHEN UPPER(cleaned_order_date) IN ('N/A', 'NA', 'UNKNOWN', 'NULL') THEN TRUE ELSE FALSE END AS is_missing, -- 明显非法模式 Sentinel invalids CASE WHEN cleaned_order_date IS NULL THEN FALSE WHEN REGEXP_LIKE(cleaned_order_date, '^(?i)(0000-00-00|99-99-9999)$') THEN TRUE WHEN REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$') THEN FALSE -- 纯8位数字单独处理 ELSE FALSE END AS is_invalid_sentinel, -- 粗略检测时区/时间存在性 REGEXP_LIKE(cleaned_order_date, '(Z\b)|([+-]\d{2}:?\d{2})|\b(UTC|GMT)\b|[A-Za-z]+/[A-Za-z_]+') AS has_tz_hint, REGEXP_LIKE(cleaned_order_date, 'T\d{2}:\d{2}|\b\d{1,2}:\d{2}') AS has_time_hint FROM clean c ), parsed AS ( SELECT f.*, -- 解析尝试/Parsing attempts (按配置优先/locale preference first) (SELECT locale_preference FROM cfg) AS locale_preference, (SELECT preferred_tz FROM cfg) AS preferred_tz, (SELECT version FROM cfg) AS normalization_version,

/* 1) 时区感知时间戳:若字符串包含时区提示,优先用 TRY_TO_TIMESTAMP_TZ 自动解析 */
CASE
  WHEN has_tz_hint THEN
    COALESCE(
      TRY_TO_TIMESTAMP_TZ(cleaned_order_date),
      TRY_TO_TIMESTAMP_TZ(REGEXP_REPLACE(cleaned_order_date, ' ', 'T'))
    )
  ELSE NULL
END AS ts_tz,

/* 2) 本地无时区时间戳(按 preferred_tz 解释本地时间;Snowflake 中值无时区,小时窗口在该本地时间上判定) */
COALESCE(
  /* ISO with time */
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD"T"HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI'),
  /* DMY/MDY with time (分隔符已统一为 -) */
  CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
    COALESCE(
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'),  -- fallback
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI')
    )
  ELSE
    COALESCE(
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'),  -- fallback
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI')
    )
  END,
  /* Natural language with time (rare): 'Mar 4, 2025 13:45:00' */
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI')
) AS ts_local_ntz,

/* 3) 仅日期(无时间)/Date-only */
COALESCE(
  TRY_TO_DATE(cleaned_order_date, 'YYYY-MM-DD'),
  CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY'),
      TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY')
    )
  ELSE
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY'),
      TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY')
    )
  END,
  TRY_TO_DATE(cleaned_order_date, 'Mon DD, YYYY'),
  TRY_TO_DATE(cleaned_order_date, 'Month DD, YYYY')
) AS d_only

FROM flags f ), evaluated AS ( SELECT p.*,

/* 解析格式标记/Parse format tag for auditing */
CASE
  WHEN is_missing THEN 'MISSING'
  WHEN is_invalid_sentinel THEN 'SENTINEL_INVALID'
  WHEN ts_tz IS NOT NULL THEN 'TIMESTAMP_TZ_AUTO'
  WHEN ts_local_ntz IS NOT NULL THEN
    CASE
      WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(:\d{2})?)?$') THEN 'ISO_LOCAL_TS'
      WHEN locale_preference='DMY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'DMY_LOCAL_TS'
      WHEN locale_preference='MDY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'MDY_LOCAL_TS'
      ELSE 'NATURAL_LANG_LOCAL_TS'
    END
  WHEN d_only IS NOT NULL THEN
    CASE
      WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}$') THEN 'ISO_DATE'
      WHEN REGEXP_LIKE(cleaned_order_date, '^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\b', 'i') THEN 'NATURAL_LANG_DATE'
      ELSE CASE WHEN locale_preference='DMY' THEN 'DMY_DATE' ELSE 'MDY_DATE' END
    END
  ELSE 'UNPARSEABLE'
END AS order_date_parse_format,

/* 源时区标记/Source timezone label */
CASE
  WHEN ts_tz IS NOT NULL THEN
    -- 使用偏移量表示源时区,用 +HH:MM/-HH:MM 格式/Format offset as +HH:MM
    LPAD(
      IFF(
        (DATE_PART('timezone_hour', ts_tz) * 60 + DATE_PART('timezone_minute', ts_tz)) < 0,
        '-', '+'
      ) || LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_hour', ts_tz))), 2, '0') || ':' ||
      LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_minute', ts_tz))), 2, '0'),
    6, '+') -- ensures string shape
  WHEN ts_local_ntz IS NOT NULL THEN (SELECT preferred_tz FROM cfg)
  ELSE NULL
END AS order_date_src_timezone,

/* 本地日期与凌晨窗口调整/Local date with 00:00–01:59 backshift */
CASE
  WHEN ts_tz IS NOT NULL THEN
    CAST(ts_tz AS DATE)
    - IFF(DATE_PART('hour', ts_tz) BETWEEN 0 AND 1, 1, 0)
  WHEN ts_local_ntz IS NOT NULL THEN
    CAST(ts_local_ntz AS DATE)
    - IFF(DATE_PART('hour', ts_local_ntz) BETWEEN 0 AND 1, 1, 0)
  ELSE
    d_only
END AS order_date_norm_candidate

FROM parsed p ), validated AS ( SELECT e.*,

/* 对纯8位数字进行安全回退(YYYYMMDD 或 DDMMYYYY/ MMDDYYYY)/Safe fallback for 8-digit numbers */
CASE
  WHEN order_date_norm_candidate IS NULL
       AND NOT is_missing
       AND NOT is_invalid_sentinel
       AND REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$')
  THEN
    /* 首选 YMD,其次按配置优先 DMY/MDY */
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'YYYYMMDD'),
      CASE WHEN locale_preference='DMY' THEN
        COALESCE(TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'), TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'))
      ELSE
        COALESCE(TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'), TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'))
      END
    )
  ELSE NULL
END AS yyyymmdd_fallback

FROM evaluated e ), finalized AS ( SELECT v.*, COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) AS order_date_normalized,

CASE
  WHEN is_missing THEN 'missing'
  WHEN is_invalid_sentinel THEN 'invalid_date_sentinel'
  WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NULL THEN 'invalid_date'
  ELSE NULL
END AS order_date_invalid_reason,

CASE
  WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NOT NULL THEN 'ok'
  ELSE 'null'
END AS order_date_normalization_status

FROM validated v )

-- 选项A:创建只读规范化视图(推荐先用于验证)/Option A: Create a read-only normalization view (recommended for validation) -- 视图包含审计列与规范化后的日期;不修改原表。 SELECT /* 原表所有列/All original columns / f. EXCLUDE (raw_order_date, cleaned_order_date, is_missing, is_invalid_sentinel, has_tz_hint, has_time_hint, locale_preference, preferred_tz, normalization_version, ts_tz, ts_local_ntz, d_only, order_date_parse_format, order_date_src_timezone, order_date_norm_candidate, yyyymmdd_fallback, order_date_normalized, order_date_invalid_reason, order_date_normalization_status),

/* 规范化输出/Normalized outputs */ f.raw_order_date, f.cleaned_order_date, f.order_date_normalized AS normalized_order_date, -- 最终 ISO 日期/Final ISO date (YYYY-MM-DD) f.order_date_parse_format, f.order_date_src_timezone, f.order_date_invalid_reason, f.order_date_normalization_status, f.normalization_version FROM finalized f;

-- 选项B:将规范化结果落库(示例 UPDATE;需预先在表上添加审计列) -- Option B: Persist normalized results (example UPDATE; add audit columns beforehand) -- ALTER TABLE ecommerce_orders -- ADD COLUMN IF NOT EXISTS raw_order_date_audit STRING, -- ADD COLUMN IF NOT EXISTS normalized_order_date DATE, -- ADD COLUMN IF NOT EXISTS order_date_parse_format STRING, -- ADD COLUMN IF NOT EXISTS order_date_src_timezone STRING, -- ADD COLUMN IF NOT EXISTS order_date_invalid_reason STRING, -- ADD COLUMN IF NOT EXISTS order_date_normalization_status STRING, -- ADD COLUMN IF NOT EXISTS normalization_version STRING;

-- MERGE 示例:以主键 id 作为匹配键(请按实际主键替换) -- Example MERGE using primary key id (replace with actual PK) -- MERGE INTO ecommerce_orders t -- USING ( -- WITH cfg AS (... same as above ...) -- SELECT -- <primary_key> AS pk, -- raw_order_date, -- order_date_normalized, -- order_date_parse_format, -- order_date_src_timezone, -- order_date_invalid_reason, -- order_date_normalization_status, -- normalization_version -- FROM finalized -- ) s -- ON t.<primary_key> = s.pk -- WHEN MATCHED THEN UPDATE SET -- t.raw_order_date_audit = s.raw_order_date, -- t.normalized_order_date = s.order_date_normalized, -- t.order_date_parse_format = s.order_date_parse_format, -- t.order_date_src_timezone = s.order_date_src_timezone, -- t.order_date_invalid_reason = s.order_date_invalid_reason, -- t.order_date_normalization_status = s.order_date_normalization_status, -- t.normalization_version = s.normalization_version;

-- 示例验证/Examples validation (预期): -- '03/04/2025' (欧式 DMY) -> '2025-04-03' -- 'Mar 4, 2025' -> '2025-03-04' -- '2025-03-04T00:30:00-05:00' -> 本地 00:30,按窗口归前一日 -> '2025-03-03' -- '0000-00-00' -> NULL, invalid_date_sentinel -- 'N/A' -> NULL, missing

// 规则说明(中文 + English) // 目标 / Goal: // 将 customer_addresses.country_code 统一为 ISO 3166-1 alpha-2(大写),兼容多语言与俗称别名。 // Normalize customer_addresses.country_code to ISO 3166-1 alpha-2 (uppercase), supporting multilingual aliases. // // 规则 / Rules: // 1) 文本清理 Text cleaning: // - 修剪空格、折叠多空格;统一全角/半角;统一常见分隔符为单空格;去除末尾标点与括号注释。 // - Trim, collapse multiple spaces; normalize full-width to half-width; normalize separators to single space; // remove trailing punctuation and trailing bracketed notes ((), [], {}, ()). // // 2) 大小写统一 Casing: // - 输入不区分大小写;输出标准代码必须为[A-Z]{2}(大写)。 // - Case-insensitive input; output must be two uppercase letters [A-Z]{2}. // // 3) 别名映射(其他) Alias mapping (Other): // - 使用可维护字典表 iso_country_lookup 将多语言/俗称映射为标准代码;优先通过 alias_key(清洗+规范后的键)匹配。 // - Use maintainable dictionary table iso_country_lookup to map aliases to standard codes via alias_key. // - 示例 Examples: 'USA'/'U.S.'/'United States' -> 'US';'UK'/'United Kingdom' -> 'GB'; // 'Deutschland' -> 'DE';'Mainland China'/'PRC' -> 'CN';'Korea, South' -> 'KR'。 // - 字典应包含中文/英文/常见本地语言别名。The lookup should include CN/EN/local names. // // 4) 缺失值处理 Missing values: // - NULL、空串、仅标点/空白 → 输出 NULL;reason = 'missing';记录 raw_value。 // - NULL/empty/punct-only → NULL with reason 'missing', keep raw_value. // // 5) 校验 Validation: // - 若输入本身为合法代码(匹配[A-Za-z]{2},且存在于 master 列表),直接转为大写输出。 // - 否则尝试别名映射;仍失败 → 输出 NULL;reason = 'unknown_alias'。 // - If input is already a valid ISO code (case-insensitive and present in master), uppercase and keep. // Else try alias; if still not found → NULL with reason 'unknown_alias'. // // 6) 变更审计 Auditing: // - 增加字段:normalized_by、rule_version(country_normalization.v1)、source_system、raw_country_value、normalization_reason。 // - Add fields: normalized_by, rule_version (country_normalization.v1), source_system, // raw_country_value, normalization_reason. // // 7) 输出与管道适配 Output & pipeline: // - 提供 Spark/Scala 批处理与流式清洗示例;未知别名写入错误/侧写表以便回填字典。 // - Provide Spark/Scala batch and streaming examples; write unknown aliases to error table for curation. // // 推荐维表结构 Recommended lookup schema (Delta/Hive): // iso_country_lookup(alias_key STRING, country_code STRING, alias_text STRING, lang STRING, updated_at TIMESTAMP) // iso_country_master(country_code STRING) -- 主表,仅含有效 ISO-2 代码 // // --------------------------------------------------------------------------------------- // Spark/Scala 实现示例 Spark/Scala implementation // 依赖:Spark 3.x,Structured Streaming 可选。Dependencies: Spark 3.x.

import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.broadcast.Broadcast

object CountryNormalization {

// 审计元数据 Audit metadata val RuleVersion = "country_normalization.v1" val NormalizedBy = "de_pipeline"

// 将全角字符转换为半角 Convert full-width to half-width private def toHalfWidth(s: String): String = { if (s == null) return null val sb = new StringBuilder(s.length) s.foreach { ch => ch match { case '\u3000' => sb.append(' ') // full-width space case c if c >= '\uFF01' && c <= '\uFF5E' => sb.append((c - 0xFEE0).toChar) // ASCII full-width block -> ASCII case other => sb.append(other) } } sb.toString() }

// 文本清理:修剪、折叠空格、去括注(尾部)、去尾随标点、统一分隔符 // Text cleanup: trim, collapse spaces, strip trailing bracketed notes, strip trailing punctuation, normalize separators private def canonicalClean(s: String): String = { if (s == null) return null val hw = toHalfWidth(s) val trimmed = hw.trim if (trimmed.isEmpty) return "" // Normalize common separators to space val sepNorm = trimmed .replaceAll("[_./·・]", " ") .replaceAll("-", " ") val collapsed = sepNorm.replaceAll("\s+", " ") // Remove trailing bracketed notes like "China (Mainland)" -> "China" val noTrailingNote = collapsed.replaceAll("\s*[\((\[【\{][^\))\]】\}][\))\]】\}]\s$", "") // Strip trailing punctuation val noTrailingPunct = noTrailingNote.replaceAll("[\p{Punct}.。、,·・]+$", "").trim noTrailingPunct }

// 生成 alias_key:清洗后转小写,移除所有非字母数字 // Build alias_key: clean, lowercase, remove non-alphanumeric private def aliasKey(s: String): String = { if (s == null) return null val cleaned = canonicalClean(s) if (cleaned == null || cleaned.isEmpty) return cleaned cleaned.toLowerCase.replaceAll("[^a-z0-9]", "") }

// 加载并广播字典映射与主码集合 // Load and broadcast alias->code map and master code set def buildBroadcastMaps( spark: SparkSession, lookupTable: String = "iso_country_lookup", masterTable: String = "iso_country_master" ): (Broadcast[Map[String, String]], Broadcast[Set[String]]) = {

import spark.implicits._

val lookupDF =
  if (tableExists(spark, lookupTable)) {
    spark.table(lookupTable).select("alias_key", "country_code")
  } else {
    // 最小可用示例字典(生产请使用表)
    // Minimal demo lookup; use table in production
    val demo = Seq(
      ("usa", "US"),
      ("us", "US"),
      ("unitedstates", "US"),
      ("u.s", "US"),
      ("unitedstatesofamerica", "US"),
      ("uk", "GB"),
      ("unitedkingdom", "GB"),
      ("greatbritain", "GB"),
      ("britain", "GB"),
      ("deutschland", "DE"),
      ("germany", "DE"),
      ("mainlandchina", "CN"),
      ("prc", "CN"),
      ("peoplesrepublicofchina", "CN"),
      ("zhonghuaRenmingongheguo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
      ("zhonghuaRenminGongHeGuo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
      ("中华人民共和国".toCharArray.mkString, "CN"), // will not match unless aliasKey applied consistently in table
      ("koreasouth", "KR"),
      ("southkorea", "KR"),
      ("대한민국".toCharArray.mkString, "KR")
    ).toDF("alias_key", "country_code")
  }

val aliasMap = lookupDF
  .filter(col("alias_key").isNotNull && col("country_code").isNotNull)
  .as[(String, String)]
  .collect()
  .toMap

val masterSet =
  if (tableExists(spark, masterTable)) {
    spark.table(masterTable).select("country_code")
      .as[String].collect().map(_.toUpperCase).toSet
  } else {
    // ISO 3166-1 alpha-2 核心示例(生产请用完整表)
    // Core sample; use complete table in production
    Set("US", "GB", "DE", "CN", "KR", "JP", "FR", "IT", "ES", "CA", "AU", "BR", "IN", "RU", "ZA", "MX", "NL", "SE", "CH", "SG", "HK", "TW", "IE", "NZ")
  }

(spark.sparkContext.broadcast(aliasMap), spark.sparkContext.broadcast(masterSet))

}

private def tableExists(spark: SparkSession, table: String): Boolean = { val parts = table.split("\.") if (parts.length == 2) { spark.catalog.tableExists(parts(0), parts(1)) } else { spark.catalog.tableExists(table) } }

// 创建用于 DataFrame 的标准化列 // Create normalized columns on a DataFrame def normalizeCountryColumn( df: DataFrame, rawCol: String = "country_code", sourceSystemCol: String = "source_system", outCol: String = "country_code_std", reasonCol: String = "normalization_reason", rawOutCol: String = "raw_country_value", normalizedByCol: String = "normalized_by", ruleVersionCol: String = "rule_version" )(implicit spark: SparkSession): DataFrame = {

val (bcAlias, bcMaster) = buildBroadcastMaps(spark)

// UDFs for cleaning and alias key
val canonicalCleanUDF = udf { s: String => canonicalClean(s) }
val aliasKeyUDF = udf { s: String => aliasKey(s) }

// UDF to normalize to ISO-2 code or null
val normalizeUDF = udf { (raw: String) =>
  val master = bcMaster.value
  val alias = bcAlias.value

  def isTwoLetter(code: String): Boolean =
    code != null && code.matches("(?i)^[A-Z]{2}$")

  if (raw == null) {
    (null: String, "missing")
  } else {
    val cleaned = canonicalClean(raw)
    if (cleaned == null || cleaned.isEmpty || cleaned.matches("^[\\p{Punct}\\s]+$")) {
      (null: String, "missing")
    } else {
      // If already a two-letter code and valid
      if (cleaned.matches("(?i)^[A-Za-z]{2}$")) {
        val up = cleaned.toUpperCase
        if (master.contains(up)) {
          (up, "ok")
        } else {
          (null: String, "unknown_alias")
        }
      } else {
        val key = aliasKey(cleaned)
        val mapped = if (key != null) alias.get(key) else None
        mapped match {
          case Some(code) if code != null && code.matches("^[A-Z]{2}$") && bcMaster.value.contains(code) =>
            (code, "ok")
          case _ =>
            (null: String, "unknown_alias")
        }
      }
    }
  }
})

val withRaw = df.withColumn(rawOutCol, col(rawCol))

val withClean = withRaw
  .withColumn("country_cleaned", canonicalCleanUDF(col(rawCol)))
  .withColumn("country_alias_key", aliasKeyUDF(col("country_cleaned")))
  .withColumn("_norm_struct", normalizeUDF(col(rawCol)))
  .withColumn(outCol, col("_norm_struct._1"))
  .withColumn(reasonCol, col("_norm_struct._2"))
  .drop("_norm_struct")
  .withColumn(normalizedByCol, lit(NormalizedBy))
  .withColumn(ruleVersionCol, lit(RuleVersion))

// Ensure uppercase output for safety
val withUpper = withClean.withColumn(outCol,
  when(col(outCol).isNotNull, upper(col(outCol))).otherwise(col(outCol))
)

// Optional: forbid anything not [A-Z]{2}
val validated = withUpper.withColumn(outCol,
  when(col(outCol).isNotNull && col(outCol).rlike("^[A-Z]{2}$"), col(outCol)).otherwise(lit(null:String))
)

// Attach source_system if not provided
val withSource =
  if (validated.columns.contains(sourceSystemCol)) validated
  else validated.withColumn(sourceSystemCol, lit(null:String))

withSource

}

// 将未知别名写入侧写/错误表(Delta/Hive) // Write unknown aliases to an error/side table (Delta/Hive) def writeUnknowns( normalizedDF: DataFrame, errorTable: String = "country_normalization_errors" ): Unit = { val unknowns = normalizedDF .filter(col("normalization_reason") =!= "ok") .select( current_timestamp().as("event_time"), col("raw_country_value"), col("country_cleaned"), col("country_alias_key"), col("normalization_reason"), col("source_system"), col("rule_version") ) unknowns.write.mode("append").format("delta").saveAsTable(errorTable) }

// 批处理示例 Batch example def runBatch( spark: SparkSession, inputTable: String = "customer_addresses", // 输入表需包含 country_code 字段 targetTable: String = "customer_addresses_silver", errorTable: String = "country_normalization_errors" ): Unit = { val input = spark.table(inputTable) val normalized = normalizeCountryColumn(input)

// 写入目标表(仅示例,可改为 merge/upsert)
normalized.write.mode("overwrite").format("delta").saveAsTable(targetTable)

// 写入未知别名
writeUnknowns(normalized, errorTable)

}

// 流式示例(以 Delta 源为例;Kafka 可将 value 解析为 DF 再套用 normalizeCountryColumn) // Streaming example (Delta source; for Kafka, parse value to DF first) def runStreaming( spark: SparkSession, sourcePath: String, // Bronze Delta path checkpointDir: String, targetTable: String = "customer_addresses_silver_stream", errorTable: String = "country_normalization_errors_stream" ): Unit = { import spark.implicits._

val streamDF = spark.readStream.format("delta").load(sourcePath)

val normalized = normalizeCountryColumn(streamDF)

val queryMain = normalized
  .writeStream
  .option("checkpointLocation", s"$checkpointDir/main")
  .outputMode("append")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write.mode("append").format("delta").saveAsTable(targetTable)
  }
  .start()

val unknowns = normalized
  .filter(col("normalization_reason") =!= "ok")
  .select(
    current_timestamp().as("event_time"),
    col("raw_country_value"),
    col("country_cleaned"),
    col("country_alias_key"),
    col("normalization_reason"),
    col("source_system"),
    col("rule_version")
  )

val queryErr = unknowns
  .writeStream
  .option("checkpointLocation", s"$checkpointDir/errors")
  .outputMode("append")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write.mode("append").format("delta").saveAsTable(errorTable)
  }
  .start()

queryMain.awaitTermination()
queryErr.awaitTermination()

}

// 简单使用示例 Simple usage example def example(spark: SparkSession): Unit = { import spark.implicits._ val data = Seq( (" united states ", "crm"), ("uk", "web"), ("中华人民共和国", "app"), ("Korea, South", "erp"), ("U.S.", "crm"), (null, "crm"), ("??", "web"), ("deutschland", "erp"), ("PRC", "app") ).toDF("country_code", "source_system")

val normalized = normalizeCountryColumn(data)
normalized.select("country_code", "country_code_std", "normalization_reason", "raw_country_value", "normalized_by", "rule_version", "source_system")
  .show(false)
// 期望输出 Expected:
// '  united  states ' -> 'US'
// 'uk' -> 'GB'
// '中华人民共和国' -> 'CN'  (依赖字典 alias_key 收录中文别名;可在 iso_country_lookup 中加入 alias)
// 'Korea, South' -> 'KR'
// 'U.S.' -> 'US'

} }

// 备注 Notes: // - 为确保校验严格,推荐维护完整的 iso_country_master(~249 个 ISO-2 代码)。 // - iso_country_lookup 中 alias_key 必须用与代码相同的 canonical 规则生成(见 aliasKey)。 // - 若别名规模较小(数千级),广播 Map 性能良好;若更大,改用 join: // df.join(broadcast(lookup), Seq("country_alias_key"), "left") // - 在生产中建议以 MERGE 方式更新目标表,并维护数据变更审计日志。 // - 本实现不修改原始列,只新增标准化列与审计列,便于回溯。

示例详情

解决的问题

通过一条可复用的高效提示词,快速为“指定表-指定列”生成清晰、可落地的数据规范化规则:1)把零散命名、格式不统一、异常值等,转化为可执行的清洗标准;2)让非技术成员也能与“内置数据工程师”协作,缩短数据整理周期;3)支持多语言输出,帮助全球团队统一口径;4)可迭代复用,沉淀企业级字段标准,提升报表与分析的准确度;5)以更低成本替代大规模手工清洗,推动数据产品、BI与增长项目更快上线。

适用用户

数据工程师

为指定表列快速生成规范化规则,统一日期、数值、文本格式,缩短清洗与上线时间。

数据分析师

在数据接入前先约定字段标准,提升报表准确率,减少手工修正与重复校验。

数据治理负责人

建立可审阅的规则文档与示例,推动部门统一标准,降低合规风险和数据口径分歧。

特征总结

即输表名与列名,轻松生成可执行的规范化规则,快速解决命名、日期、数值等常见混乱。
一键输出结构清晰的规则说明与示例,便于团队统一执行、落地到日常数据整理流程。
自动识别上下文需求,给出适配不同业务场景的规范建议,减少反复沟通与规则争议。
可按语言切换输出内容,支持中文、英文等团队协作场景,推进跨部门规范统一。
提供可复制的校验步骤与异常处理建议,帮助你快速发现脏数据并制定修复路径。
模板化参数输入,按需定制规则颗粒度与覆盖范围,兼顾灵活性与可落地性。
内置专业写作风格,输出严谨、易读、结构化的说明文档,直接用于评审与培训。
支持多行业字段场景,如商品编码、客户信息、时间记录等,快速形成可复用的最佳实践。

如何使用购买的提示词模板

1. 直接在外部 Chat 应用中使用

将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。

2. 发布为 API 接口调用

把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。

3. 在 MCP Client 中配置使用

在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。

AI 提示词价格
¥25.00元
先用后买,用好了再付款,超安全!

您购买后可以获得什么

获得完整提示词模板
- 共 287 tokens
- 6 个可调节参数
{ 表格名称 } { 列名称 } { 规范化类型 } { 输出语言 } { 规则说明 } { 示例代码生成 }
获得社区贡献内容的使用权
- 精选社区优质案例,助您快速上手提示词
使用提示词兑换券,低至 ¥ 9.9
了解兑换券 →
限时半价

不要错过!

半价获取高级提示词-优惠即将到期

17
:
23
小时
:
59
分钟
:
59