×
¥
查看详情
🔥 会员专享 文生文 数据处理

列数据规范化

👁️ 397 次查看
📅 Nov 24, 2025
💡 核心价值: 为指定表格列生成专业的数据规范化规则,提供技术性解决方案,确保数据预处理、清洗和管道构建过程的准确性与一致性,提升数据质量和数据工程效率。

🎯 可自定义参数(6个)

表格名称
需要规范化的表格名称
列名称
指定需要规范化的列
规范化类型
数据规范化类型
输出语言
输出代码使用的编程语言
规则说明
可自定义规范化规则说明或注释
示例代码生成
是否生成示例代码

🎨 效果示例

""" 规则版本: name_normalization.v1

目标/Goal:

  • 将 crm_contacts.full_name 统一为可比的规范形态,支持中英文团队协作。
  • Normalize crm_contacts.full_name into a comparable canonical form for bilingual (Chinese-English) collaboration.

规则/Rules:

  1. 预清洗 Pre-cleaning:
    • 去除首尾空格,合并连续空格为1个。
    • 清除 HTML 标签、控制字符(Cc)、Emoji。
    • 记录原始数字比例;若数字比例>20%,标记为 invalid:numeric_ratio 并返回 NULL。
    • 移除阿拉伯数字与尾随标点(.,;:_-);保留合法中间分隔符(-、·、’)。
  2. 大小写统一 Case normalization:
    • 对拉丁字符采用 Title Case;保留合法撇号与连字符(如 McDonald、O’Connor、Anna-Maria)。
    • 非拉丁/中文保持原文,仅统一冗余空格与符号。
  3. 符号标准 Symbol standardization:
    • 统一全角/半角、引号和连字符:将各类撇号标准化为 ’ (U+2019),连字符统一为 - (U+002D),保留中间分隔符(-、·、’)。
  4. 文化姓名处理 Cultural handling:
    • 中文名不拆分、不重排(例:王小明)。
    • 西文名按 Firstname Lastname,保留中间名缩写与敬称位置。
  5. 去重 Deduplication:
    • 以规范化后字符串的 SHA-256 为键分组。
    • 保留最新记录(按 updated_at 最大),汇总来源ID与出现频次。
  6. 缺失值 Missing values:
    • 空字符串、只含空白或清洗后为空的记为 NULL。
    • 在 audit 列写入原因(empty/invalid),可附加 notes。
  7. 白名单与例外 Whitelist & exceptions:
    • 保留合法前/后缀(如 Dr., Ms., PhD),在 lookup_names_titles 字典表维护与标准化。
  8. 输出 Output:
    • 生成中英文规则说明与 Python 示例代码;代码函数化、幂等、可复用,可在 Airflow/Prefect 任务中调用。
  9. 校验 Validation:
    • 规范化后长度需在 1–120 字符;超长截断为 120,并记录 audit 原因 truncated 与 notes。
  10. 版本化 Versioning:
  • 规则版本固定为 name_normalization.v1,后续追加不破坏兼容。

Examples:

  • ' ms. anna o’connor ' -> 'Ms. Anna O’Connor'
  • '张三' -> '张三' """

from future import annotations

import re import hashlib import unicodedata from datetime import datetime from typing import Dict, List, Optional, Tuple, Any

RULE_VERSION = "name_normalization.v1"

Title whitelist/lookup dictionary table (可扩展/Extensible)

lookup_names_titles: Dict[str, Dict[str, str]] = { "prefix": { # English "mr": "Mr.", "mrs": "Mrs.", "ms": "Ms.", "miss": "Ms.", "dr": "Dr.", "prof": "Prof.", "sir": "Sir", "madam": "Ms.", "mx": "Mx.", # Chinese to English mappings (optional normalization) "先生": "Mr.", "女士": "Ms.", "小姐": "Ms.", "博士": "Dr.", "教授": "Prof.", }, "suffix": { # English "phd": "PhD", "md": "MD", "dds": "DDS", "dvm": "DVM", "jd": "JD", "mba": "MBA", "jr": "Jr.", "sr": "Sr.", "iii": "III", "iv": "IV", # Chinese degrees could be mapped if desired; left empty to avoid lossy conversion }, }

Emoji ranges for broad removal (BMP + supplementary planes)

_EMOJI_PATTERN = re.compile( "[" + "\U0001F600-\U0001F64F" + # Emoticons "\U0001F300-\U0001F5FF" + # Misc Symbols and Pictographs "\U0001F680-\U0001F6FF" + # Transport & Map "\U0001F1E6-\U0001F1FF" + # Regional Indicators "\U0001F900-\U0001F9FF" + # Supplemental Symbols and Pictographs "\U0001FA70-\U0001FAFF" + # Symbols & Pictographs Extended-A "\U00002700-\U000027BF" + # Dingbats "\U00002600-\U000026FF" + # Misc symbols "]", flags=re.UNICODE )

_HTML_PATTERN = re.compile(r"<[^>]*>") _MULTI_SPACE_PATTERN = re.compile(r"\s+")

Allowed middle separators inside names

_ALLOWED_SEPARATORS = {"-", "·", "’"} # hyphen, middle dot, right single quotation mark

Trailing punctuation to strip from end of string

TRAILING_PUNCT_CHARS = ".,;:-"

def _strip_html(s: str) -> str: return _HTML_PATTERN.sub("", s)

def _remove_emoji(s: str) -> str: return _EMOJI_PATTERN.sub("", s)

def _remove_control_chars(s: str) -> str: # Remove Unicode category Cc (control chars) return "".join(ch for ch in s if unicodedata.category(ch) != "Cc")

def _normalize_width_and_symbols(s: str) -> str: # Normalize width (fullwidth -> halfwidth) and standardize symbols s = unicodedata.normalize("NFKC", s) # Standardize apostrophes to U+2019 s = s.replace("\u2018", "’").replace("\u2019", "’").replace("'", "’").replace("`", "’") # Standardize hyphens to ASCII hyphen-minus s = s.replace("\u2013", "-").replace("\u2014", "-").replace("\u2212", "-").replace("-", "-") # Leave middle dot as is return s

def _strip_trailing_punct(s: str) -> str: return s.rstrip(_TRAILING_PUNCT_CHARS)

def _collapse_spaces(s: str) -> str: s = s.strip() s = _MULTI_SPACE_PATTERN.sub(" ", s) return s

def _digit_ratio(s: str) -> float: if not s: return 0.0 total = sum(1 for ch in s if not ch.isspace()) digits = sum(1 for ch in s if ch.isdigit()) return digits / total if total > 0 else 0.0

def _is_latin_token(token: str) -> bool: # Consider a token Latin if all letters are from Latin scripts or separators/punct for ch in token: if ch.isalpha(): try: name = unicodedata.name(ch) except ValueError: return False if "LATIN" not in name: return False elif ch in _ALLOWED_SEPARATORS or ch in {".", " "}: continue else: # other characters like Chinese/Cyrillic return False return True

def _titlecase_latin_token(token: str) -> str: # Break on allowed separators and apply casing per segment def tc_segment(seg: str) -> str: if not seg: return seg lower = seg.lower() # Preserve Mc/Mac style (basic heuristics) if lower.startswith("mc") and len(lower) > 2: return "Mc" + lower[2:3].upper() + lower[3:] # Keep single-letter initials uppercase (e.g., "j." -> "J.") if len(seg) == 1: return seg.upper() return lower[0].upper() + lower[1:] # Tokens can have periods (e.g., "dr."), leave them for title mapping step # Split by apostrophe and hyphen to title-case each segment parts_apostrophe = token.split("’") parts_apostrophe = [subpart for subpart in parts_apostrophe] for i, ap in enumerate(parts_apostrophe): subparts = ap.split("-") subparts = [tc_segment(sp) for sp in subparts] parts_apostrophe[i] = "-".join(subparts) return "’".join(parts_apostrophe)

def _standardize_prefix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: # Normalize token for matching (strip trailing dot) candidate = token.replace(".", "") canonical = title_lookup.get("prefix", {}).get(candidate.lower()) if canonical: return canonical return None

def _standardize_suffix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: candidate = token.replace(".", "") canonical = title_lookup.get("suffix", {}).get(candidate.lower()) if canonical: return canonical return None

def _process_tokens(tokens: List[str], title_lookup: Dict[str, Dict[str, str]]) -> List[str]: if not tokens: return tokens

# Handle prefix at first token
first = tokens[0]
pref = _standardize_prefix(first, title_lookup)
if pref:
    tokens[0] = pref
    start_idx = 1
else:
    start_idx = 0

# Handle suffix at last token
last = tokens[-1]
suff = _standardize_suffix(last, title_lookup)
if suff:
    tokens[-1] = suff
    end_idx = len(tokens) - 1
else:
    end_idx = len(tokens)

# Title-case middle tokens that are Latin
for i in range(start_idx, end_idx):
    tok = tokens[i]
    if _is_latin_token(tok):
        tokens[i] = _titlecase_latin_token(tok)
    else:
        # Non-Latin: leave as-is (e.g., Chinese), only width/symbol normalization already applied
        tokens[i] = tok
return tokens

def _remove_digits(s: str) -> str: return re.sub(r"\d+", "", s)

def _normalize_name_string(raw: str, title_lookup: Dict[str, Dict[str, str]]) -> Tuple[Optional[str], Dict[str, Any]]: """ Returns (normalized_name or None, audit_info) audit_info contains: reason, notes """ audit = {"reason": None, "notes": {}}

if raw is None:
    audit["reason"] = "empty"
    return None, audit

original = raw

# Numeric ratio check on original text (excluding spaces)
ratio = _digit_ratio(original)
if ratio > 0.20:
    audit["reason"] = "invalid:numeric_ratio"
    audit["notes"]["digit_ratio"] = ratio
    return None, audit

# Pre-cleaning
s = _strip_html(original)
s = _remove_control_chars(s)
s = _remove_emoji(s)
s = _normalize_width_and_symbols(s)
s = _collapse_spaces(s)

# Remove digits (Arabic numerals) per rule
s = _remove_digits(s)

# Strip trailing punctuation
s = _strip_trailing_punct(s)

# Space collapse again if removal created double spaces
s = _collapse_spaces(s)

# Missing value check
if not s or s.strip() == "":
    audit["reason"] = "empty"
    return None, audit

# Tokenization by spaces for prefix/suffix handling and Latin title-case
tokens = s.split(" ")

# Standardize titles and apply Latin title-case
tokens = _process_tokens(tokens, title_lookup)

normalized = " ".join(tokens)

# Length validation
nlen = len(normalized)
if nlen == 0:
    audit["reason"] = "empty"
    return None, audit
if nlen > 120:
    normalized = normalized[:120]
    audit["reason"] = "truncated"
    audit["notes"]["original_length"] = nlen
    audit["notes"]["truncated_to"] = 120
else:
    audit["reason"] = "ok"

return normalized, audit

def _sha256_hex(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest()

def normalize_full_name( name: Optional[str], *, title_lookup: Dict[str, Dict[str, str]] = lookup_names_titles, ) -> Dict[str, Any]: """ Normalize a full name string according to name_normalization.v1.

Returns a dict:
{
    "normalized_full_name": Optional[str],
    "full_name_hash": Optional[str],  # SHA-256 of normalized string
    "audit_reason": str,              # "ok", "empty", "invalid:numeric_ratio", "truncated"
    "audit_notes": Dict[str, Any],
    "version": RULE_VERSION,
}

Idempotent: applying the function multiple times yields the same result.
"""
normalized, audit = _normalize_name_string(name, title_lookup)
result = {
    "normalized_full_name": normalized,
    "full_name_hash": _sha256_hex(normalized) if normalized else None,
    "audit_reason": audit["reason"],
    "audit_notes": audit.get("notes", {}),
    "version": RULE_VERSION,
}
return result

def _parse_datetime(dt: Any) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): return dt # Try ISO 8601 try: # Support 'Z' suffix if isinstance(dt, str): if dt.endswith("Z"): dt = dt[:-1] return datetime.fromisoformat(dt) except Exception: return None return None

def deduplicate_contacts(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Deduplicate contacts by normalized_full_name SHA-256 hash.

Input records: list of dicts, each containing at least:
    {
        "id": Any,
        "full_name": str,
        "source_id": Any,
        "updated_at": datetime or ISO8601 str,
        ... (other fields)
    }

Process:
- Normalize full_name (apply rules).
- Group by normalized_full_name hash.
- Keep the latest record (max updated_at) per group.
- Summarize source_ids (unique) and occurrences (count).
- Records with NULL normalized_full_name are excluded from grouping and returned individually with audit.

Returns list of dicts:
    {
        "id": Any,                           # id of retained (latest) record
        "normalized_full_name": Optional[str],
        "full_name_hash": Optional[str],
        "audit_reason": str,
        "audit_notes": Dict[str, Any],
        "version": RULE_VERSION,
        "source_ids": List[Any],             # aggregated for grouped records
        "occurrences": int,                  # count in group
        "representative_record": Dict[str, Any],  # latest record snapshot
    }
"""
groups: Dict[str, Dict[str, Any]] = {}
invalids: List[Dict[str, Any]] = []

for rec in records:
    norm = normalize_full_name(rec.get("full_name"))
    updated = _parse_datetime(rec.get("updated_at"))

    enriched = {
        "id": rec.get("id"),
        "normalized_full_name": norm["normalized_full_name"],
        "full_name_hash": norm["full_name_hash"],
        "audit_reason": norm["audit_reason"],
        "audit_notes": norm["audit_notes"],
        "version": RULE_VERSION,
        "source_ids": [rec.get("source_id")],
        "occurrences": 1,
        "representative_record": rec,
        "_updated_at": updated or datetime.min,
    }

    if norm["normalized_full_name"] is None or norm["full_name_hash"] is None:
        # keep as standalone invalid/empty
        invalids.append(enriched)
        continue

    key = norm["full_name_hash"]
    if key not in groups:
        groups[key] = enriched
    else:
        grp = groups[key]
        # Aggregate source_ids
        grp["source_ids"].append(rec.get("source_id"))
        grp["occurrences"] += 1
        # Choose latest record
        cur_dt = grp["_updated_at"]
        new_dt = enriched["_updated_at"]
        if new_dt and new_dt > cur_dt:
            grp["id"] = enriched["id"]
            grp["representative_record"] = rec
            grp["_updated_at"] = new_dt
        # Audit reason: if any member truncated, keep "ok"/"truncated" precedence
        # Prefer "ok" over "truncated" only if representative not truncated; else keep truncated
        # If any invalid slipped here (shouldn't), ignore since norm exists
        # Dedup group keeps normalized_full_name from representative
        groups[key] = grp

# Finalize output: dedup source_ids unique, remove internal fields
output: List[Dict[str, Any]] = []
for g in groups.values():
    g["source_ids"] = [sid for sid in set(g["source_ids"]) if sid is not None]
    g.pop("_updated_at", None)
    output.append(g)

# Include invalids as independent (no grouping)
for inv in invalids:
    inv["source_ids"] = [sid for sid in set(inv["source_ids"]) if sid is not None]
    inv.pop("_updated_at", None)
    output.append(inv)

return output

Example reusable task wrappers

def process_batch(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Batch processor suitable for Airflow/Prefect tasks: - Applies normalization and deduplication. - Returns normalized, deduplicated list with audit info. """ return deduplicate_contacts(records)

def airflow_task(**context) -> List[Dict[str, Any]]: """ Airflow-compatible callable. Expects 'records' in context['params'] or context['ti'].xcom_pull(). """ params = context.get("params", {}) if context else {} records = params.get("records") if records is None and context: try: records = context["ti"].xcom_pull(task_ids=params.get("upstream_task_id")) except Exception: records = [] records = records or [] return process_batch(records)

def prefect_task(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Prefect-compatible callable. """ return process_batch(records)

---- Demonstration ----

def _demo(): samples = [ {"id": 1, "full_name": " ms. anna o’connor ", "source_id": "crm_a", "updated_at": "2024-01-02T12:00:00Z"}, {"id": 2, "full_name": "Ms anna O'CONNOR", "source_id": "crm_b", "updated_at": "2024-03-05T09:30:00Z"}, {"id": 3, "full_name": "张三", "source_id": "crm_c", "updated_at": "2024-05-01T08:00:00Z"}, {"id": 4, "full_name": "Dr john mcDonald phd", "source_id": "crm_d", "updated_at": "2024-02-02T10:00:00Z"}, {"id": 5, "full_name": " ", "source_id": "crm_e", "updated_at": "2024-02-03T10:00:00Z"}, {"id": 6, "full_name": "Anna 1234 O’Connor 5678", "source_id": "crm_f", "updated_at": "2024-02-04T10:00:00Z"}, ] result = process_batch(samples) for r in result: print(r)

if name == "main": _demo()

-- date_normalization.v1 -- 目标/Goal: 统一 ecommerce_orders.order_date 为 ISO 8601 YYYY-MM-DD;标准化时区并处理异常值。 -- 输出包括:中文+英文说明(作为注释)与可执行 SQL 示例(清洗、解析、校验、转换、审计列写入)。 -- 说明: -- 1) 预清洗:去除多余空格与文本噪声(如 'Date:'、括号注释),统一分隔符(/, ., - -> -),移除序数后缀(st/nd/rd/th)。 -- 2) 解析输入格式:优先按地区配置解析(本示例:EU/DMY;可改为 MDY);支持: -- - 'YYYY-MM-DD'、'DD/MM/YYYY'、'MM-DD-YYYY'、'YYYY/MM/DD' -- - 英语自然语言 'Mar 4, 2025'/'March 4, 2025' -- - 含时间与时区 '2025-03-04T23:30:00-05:00'、'2025/03/04 23:30:00 PST' -- 安全回退:多模式 TRY 解析,均失败则置为 NULL。 -- 3) 时区统一:如字符串含时区,按其本地时间提取日期;无时区但含时间则按配置时区解释本地时间。 -- 凌晨跨日窗口:若本地时间 00:00–01:59,归前一日(即本地日期减 1)。 -- 最终输出仅存储日期部分(YYYY-MM-DD)。 -- 4) 非法日期校验:闰年、月/日范围由 TRY_ 函数保证;'0000-00-00'、'99/99/9999'、纯数字但不成合法日期置 NULL,记录 invalid_date。 -- 5) 缺失值:'N/A'、'unknown'、空字符串或清洗后为空 -> NULL,记录 missing;原始值写入 raw_order_date 以溯源。 -- 6) 审计列:raw_order_date、order_date_parse_format、order_date_src_timezone、order_date_invalid_reason、normalization_version。 -- 7) 可作为视图或用于 UPDATE/INSERT INTO 规范化落库。

-- 注:本示例使用 Snowflake SQL 方言;若使用其他引擎,请替换等效函数(TRY_TO_DATE/TRY_TO_TIMESTAMP[_TZ]、CONVERT_TIMEZONE、REGEXP_REPLACE 等)。

-- 可配置解析偏好与默认时区(优先地区/Preferred locale: DMY;默认时区/Default TZ: Europe/Berlin) -- 将 locale_preference 改为 'MDY' 可按美式优先;preferred_tz 改为业务地区。 WITH cfg AS ( SELECT 'DMY'::STRING AS locale_preference, -- 解析优先顺序/Parsing priority 'Europe/Berlin'::STRING AS preferred_tz, -- 缺省本地时区/Default local timezone 'date_normalization.v1'::STRING AS version ), src AS ( SELECT t., CAST(order_date AS STRING) AS raw_order_date -- 源字段快照/Raw snapshot FROM ecommerce_orders t ), clean AS ( SELECT s., -- 预清洗/Pre-cleaning -- 1) 去前后空格 Trim -- 2) 去前缀噪声 Remove 'Date:' 前缀 -- 3) 去括号/方括号注释 Remove bracketed annotations -- 4) 统一分隔符 . / -> - -- 5) 移除序数后缀 1st/2nd/3rd/4th 等 Remove ordinal suffixes -- 6) 多空格压缩 Single-space normalization NULLIF( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( TRIM(raw_order_date), '(?i)^\sdate\s:\s*', '' -- remove leading 'Date:' ), '\s*[([].?[)]]', '' -- strip (...) or [...] ), '[./]', '-' -- unify separators to '-' ), '(?<=\d)(?i)(st|nd|rd|th)\b', '' -- remove ordinal suffixes ), '\s+', ' ' -- collapse multiple spaces ), '' ) AS cleaned_order_date FROM src s ), flags AS ( SELECT c., -- 缺失值识别 Missing values CASE WHEN cleaned_order_date IS NULL THEN TRUE WHEN UPPER(cleaned_order_date) IN ('N/A', 'NA', 'UNKNOWN', 'NULL') THEN TRUE ELSE FALSE END AS is_missing, -- 明显非法模式 Sentinel invalids CASE WHEN cleaned_order_date IS NULL THEN FALSE WHEN REGEXP_LIKE(cleaned_order_date, '^(?i)(0000-00-00|99-99-9999)$') THEN TRUE WHEN REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$') THEN FALSE -- 纯8位数字单独处理 ELSE FALSE END AS is_invalid_sentinel, -- 粗略检测时区/时间存在性 REGEXP_LIKE(cleaned_order_date, '(Z\b)|([+-]\d{2}:?\d{2})|\b(UTC|GMT)\b|[A-Za-z]+/[A-Za-z_]+') AS has_tz_hint, REGEXP_LIKE(cleaned_order_date, 'T\d{2}:\d{2}|\b\d{1,2}:\d{2}') AS has_time_hint FROM clean c ), parsed AS ( SELECT f.*, -- 解析尝试/Parsing attempts (按配置优先/locale preference first) (SELECT locale_preference FROM cfg) AS locale_preference, (SELECT preferred_tz FROM cfg) AS preferred_tz, (SELECT version FROM cfg) AS normalization_version,

/* 1) 时区感知时间戳:若字符串包含时区提示,优先用 TRY_TO_TIMESTAMP_TZ 自动解析 */
CASE
  WHEN has_tz_hint THEN
    COALESCE(
      TRY_TO_TIMESTAMP_TZ(cleaned_order_date),
      TRY_TO_TIMESTAMP_TZ(REGEXP_REPLACE(cleaned_order_date, ' ', 'T'))
    )
  ELSE NULL
END AS ts_tz,

/* 2) 本地无时区时间戳(按 preferred_tz 解释本地时间;Snowflake 中值无时区,小时窗口在该本地时间上判定) */
COALESCE(
  /* ISO with time */
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD"T"HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI'),
  /* DMY/MDY with time (分隔符已统一为 -) */
  CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
    COALESCE(
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'),  -- fallback
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI')
    )
  ELSE
    COALESCE(
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI'),
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'),  -- fallback
      TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI')
    )
  END,
  /* Natural language with time (rare): 'Mar 4, 2025 13:45:00' */
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI:SS'),
  TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI')
) AS ts_local_ntz,

/* 3) 仅日期(无时间)/Date-only */
COALESCE(
  TRY_TO_DATE(cleaned_order_date, 'YYYY-MM-DD'),
  CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY'),
      TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY')
    )
  ELSE
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY'),
      TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY')
    )
  END,
  TRY_TO_DATE(cleaned_order_date, 'Mon DD, YYYY'),
  TRY_TO_DATE(cleaned_order_date, 'Month DD, YYYY')
) AS d_only

FROM flags f ), evaluated AS ( SELECT p.*,

/* 解析格式标记/Parse format tag for auditing */
CASE
  WHEN is_missing THEN 'MISSING'
  WHEN is_invalid_sentinel THEN 'SENTINEL_INVALID'
  WHEN ts_tz IS NOT NULL THEN 'TIMESTAMP_TZ_AUTO'
  WHEN ts_local_ntz IS NOT NULL THEN
    CASE
      WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(:\d{2})?)?$') THEN 'ISO_LOCAL_TS'
      WHEN locale_preference='DMY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'DMY_LOCAL_TS'
      WHEN locale_preference='MDY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'MDY_LOCAL_TS'
      ELSE 'NATURAL_LANG_LOCAL_TS'
    END
  WHEN d_only IS NOT NULL THEN
    CASE
      WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}$') THEN 'ISO_DATE'
      WHEN REGEXP_LIKE(cleaned_order_date, '^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\b', 'i') THEN 'NATURAL_LANG_DATE'
      ELSE CASE WHEN locale_preference='DMY' THEN 'DMY_DATE' ELSE 'MDY_DATE' END
    END
  ELSE 'UNPARSEABLE'
END AS order_date_parse_format,

/* 源时区标记/Source timezone label */
CASE
  WHEN ts_tz IS NOT NULL THEN
    -- 使用偏移量表示源时区,用 +HH:MM/-HH:MM 格式/Format offset as +HH:MM
    LPAD(
      IFF(
        (DATE_PART('timezone_hour', ts_tz) * 60 + DATE_PART('timezone_minute', ts_tz)) < 0,
        '-', '+'
      ) || LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_hour', ts_tz))), 2, '0') || ':' ||
      LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_minute', ts_tz))), 2, '0'),
    6, '+') -- ensures string shape
  WHEN ts_local_ntz IS NOT NULL THEN (SELECT preferred_tz FROM cfg)
  ELSE NULL
END AS order_date_src_timezone,

/* 本地日期与凌晨窗口调整/Local date with 00:00–01:59 backshift */
CASE
  WHEN ts_tz IS NOT NULL THEN
    CAST(ts_tz AS DATE)
    - IFF(DATE_PART('hour', ts_tz) BETWEEN 0 AND 1, 1, 0)
  WHEN ts_local_ntz IS NOT NULL THEN
    CAST(ts_local_ntz AS DATE)
    - IFF(DATE_PART('hour', ts_local_ntz) BETWEEN 0 AND 1, 1, 0)
  ELSE
    d_only
END AS order_date_norm_candidate

FROM parsed p ), validated AS ( SELECT e.*,

/* 对纯8位数字进行安全回退(YYYYMMDD 或 DDMMYYYY/ MMDDYYYY)/Safe fallback for 8-digit numbers */
CASE
  WHEN order_date_norm_candidate IS NULL
       AND NOT is_missing
       AND NOT is_invalid_sentinel
       AND REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$')
  THEN
    /* 首选 YMD,其次按配置优先 DMY/MDY */
    COALESCE(
      TRY_TO_DATE(cleaned_order_date, 'YYYYMMDD'),
      CASE WHEN locale_preference='DMY' THEN
        COALESCE(TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'), TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'))
      ELSE
        COALESCE(TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'), TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'))
      END
    )
  ELSE NULL
END AS yyyymmdd_fallback

FROM evaluated e ), finalized AS ( SELECT v.*, COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) AS order_date_normalized,

CASE
  WHEN is_missing THEN 'missing'
  WHEN is_invalid_sentinel THEN 'invalid_date_sentinel'
  WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NULL THEN 'invalid_date'
  ELSE NULL
END AS order_date_invalid_reason,

CASE
  WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NOT NULL THEN 'ok'
  ELSE 'null'
END AS order_date_normalization_status

FROM validated v )

-- 选项A:创建只读规范化视图(推荐先用于验证)/Option A: Create a read-only normalization view (recommended for validation) -- 视图包含审计列与规范化后的日期;不修改原表。 SELECT /* 原表所有列/All original columns / f. EXCLUDE (raw_order_date, cleaned_order_date, is_missing, is_invalid_sentinel, has_tz_hint, has_time_hint, locale_preference, preferred_tz, normalization_version, ts_tz, ts_local_ntz, d_only, order_date_parse_format, order_date_src_timezone, order_date_norm_candidate, yyyymmdd_fallback, order_date_normalized, order_date_invalid_reason, order_date_normalization_status),

/* 规范化输出/Normalized outputs */ f.raw_order_date, f.cleaned_order_date, f.order_date_normalized AS normalized_order_date, -- 最终 ISO 日期/Final ISO date (YYYY-MM-DD) f.order_date_parse_format, f.order_date_src_timezone, f.order_date_invalid_reason, f.order_date_normalization_status, f.normalization_version FROM finalized f;

-- 选项B:将规范化结果落库(示例 UPDATE;需预先在表上添加审计列) -- Option B: Persist normalized results (example UPDATE; add audit columns beforehand) -- ALTER TABLE ecommerce_orders -- ADD COLUMN IF NOT EXISTS raw_order_date_audit STRING, -- ADD COLUMN IF NOT EXISTS normalized_order_date DATE, -- ADD COLUMN IF NOT EXISTS order_date_parse_format STRING, -- ADD COLUMN IF NOT EXISTS order_date_src_timezone STRING, -- ADD COLUMN IF NOT EXISTS order_date_invalid_reason STRING, -- ADD COLUMN IF NOT EXISTS order_date_normalization_status STRING, -- ADD COLUMN IF NOT EXISTS normalization_version STRING;

-- MERGE 示例:以主键 id 作为匹配键(请按实际主键替换) -- Example MERGE using primary key id (replace with actual PK) -- MERGE INTO ecommerce_orders t -- USING ( -- WITH cfg AS (... same as above ...) -- SELECT -- <primary_key> AS pk, -- raw_order_date, -- order_date_normalized, -- order_date_parse_format, -- order_date_src_timezone, -- order_date_invalid_reason, -- order_date_normalization_status, -- normalization_version -- FROM finalized -- ) s -- ON t.<primary_key> = s.pk -- WHEN MATCHED THEN UPDATE SET -- t.raw_order_date_audit = s.raw_order_date, -- t.normalized_order_date = s.order_date_normalized, -- t.order_date_parse_format = s.order_date_parse_format, -- t.order_date_src_timezone = s.order_date_src_timezone, -- t.order_date_invalid_reason = s.order_date_invalid_reason, -- t.order_date_normalization_status = s.order_date_normalization_status, -- t.normalization_version = s.normalization_version;

-- 示例验证/Examples validation (预期): -- '03/04/2025' (欧式 DMY) -> '2025-04-03' -- 'Mar 4, 2025' -> '2025-03-04' -- '2025-03-04T00:30:00-05:00' -> 本地 00:30,按窗口归前一日 -> '2025-03-03' -- '0000-00-00' -> NULL, invalid_date_sentinel -- 'N/A' -> NULL, missing

// 规则说明(中文 + English) // 目标 / Goal: // 将 customer_addresses.country_code 统一为 ISO 3166-1 alpha-2(大写),兼容多语言与俗称别名。 // Normalize customer_addresses.country_code to ISO 3166-1 alpha-2 (uppercase), supporting multilingual aliases. // // 规则 / Rules: // 1) 文本清理 Text cleaning: // - 修剪空格、折叠多空格;统一全角/半角;统一常见分隔符为单空格;去除末尾标点与括号注释。 // - Trim, collapse multiple spaces; normalize full-width to half-width; normalize separators to single space; // remove trailing punctuation and trailing bracketed notes ((), [], {}, ()). // // 2) 大小写统一 Casing: // - 输入不区分大小写;输出标准代码必须为[A-Z]{2}(大写)。 // - Case-insensitive input; output must be two uppercase letters [A-Z]{2}. // // 3) 别名映射(其他) Alias mapping (Other): // - 使用可维护字典表 iso_country_lookup 将多语言/俗称映射为标准代码;优先通过 alias_key(清洗+规范后的键)匹配。 // - Use maintainable dictionary table iso_country_lookup to map aliases to standard codes via alias_key. // - 示例 Examples: 'USA'/'U.S.'/'United States' -> 'US';'UK'/'United Kingdom' -> 'GB'; // 'Deutschland' -> 'DE';'Mainland China'/'PRC' -> 'CN';'Korea, South' -> 'KR'。 // - 字典应包含中文/英文/常见本地语言别名。The lookup should include CN/EN/local names. // // 4) 缺失值处理 Missing values: // - NULL、空串、仅标点/空白 → 输出 NULL;reason = 'missing';记录 raw_value。 // - NULL/empty/punct-only → NULL with reason 'missing', keep raw_value. // // 5) 校验 Validation: // - 若输入本身为合法代码(匹配[A-Za-z]{2},且存在于 master 列表),直接转为大写输出。 // - 否则尝试别名映射;仍失败 → 输出 NULL;reason = 'unknown_alias'。 // - If input is already a valid ISO code (case-insensitive and present in master), uppercase and keep. // Else try alias; if still not found → NULL with reason 'unknown_alias'. // // 6) 变更审计 Auditing: // - 增加字段:normalized_by、rule_version(country_normalization.v1)、source_system、raw_country_value、normalization_reason。 // - Add fields: normalized_by, rule_version (country_normalization.v1), source_system, // raw_country_value, normalization_reason. // // 7) 输出与管道适配 Output & pipeline: // - 提供 Spark/Scala 批处理与流式清洗示例;未知别名写入错误/侧写表以便回填字典。 // - Provide Spark/Scala batch and streaming examples; write unknown aliases to error table for curation. // // 推荐维表结构 Recommended lookup schema (Delta/Hive): // iso_country_lookup(alias_key STRING, country_code STRING, alias_text STRING, lang STRING, updated_at TIMESTAMP) // iso_country_master(country_code STRING) -- 主表,仅含有效 ISO-2 代码 // // --------------------------------------------------------------------------------------- // Spark/Scala 实现示例 Spark/Scala implementation // 依赖:Spark 3.x,Structured Streaming 可选。Dependencies: Spark 3.x.

import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.broadcast.Broadcast

object CountryNormalization {

// 审计元数据 Audit metadata val RuleVersion = "country_normalization.v1" val NormalizedBy = "de_pipeline"

// 将全角字符转换为半角 Convert full-width to half-width private def toHalfWidth(s: String): String = { if (s == null) return null val sb = new StringBuilder(s.length) s.foreach { ch => ch match { case '\u3000' => sb.append(' ') // full-width space case c if c >= '\uFF01' && c <= '\uFF5E' => sb.append((c - 0xFEE0).toChar) // ASCII full-width block -> ASCII case other => sb.append(other) } } sb.toString() }

// 文本清理:修剪、折叠空格、去括注(尾部)、去尾随标点、统一分隔符 // Text cleanup: trim, collapse spaces, strip trailing bracketed notes, strip trailing punctuation, normalize separators private def canonicalClean(s: String): String = { if (s == null) return null val hw = toHalfWidth(s) val trimmed = hw.trim if (trimmed.isEmpty) return "" // Normalize common separators to space val sepNorm = trimmed .replaceAll("[_./·・]", " ") .replaceAll("-", " ") val collapsed = sepNorm.replaceAll("\s+", " ") // Remove trailing bracketed notes like "China (Mainland)" -> "China" val noTrailingNote = collapsed.replaceAll("\s*[\((\[【\{][^\))\]】\}][\))\]】\}]\s$", "") // Strip trailing punctuation val noTrailingPunct = noTrailingNote.replaceAll("[\p{Punct}.。、,·・]+$", "").trim noTrailingPunct }

// 生成 alias_key:清洗后转小写,移除所有非字母数字 // Build alias_key: clean, lowercase, remove non-alphanumeric private def aliasKey(s: String): String = { if (s == null) return null val cleaned = canonicalClean(s) if (cleaned == null || cleaned.isEmpty) return cleaned cleaned.toLowerCase.replaceAll("[^a-z0-9]", "") }

// 加载并广播字典映射与主码集合 // Load and broadcast alias->code map and master code set def buildBroadcastMaps( spark: SparkSession, lookupTable: String = "iso_country_lookup", masterTable: String = "iso_country_master" ): (Broadcast[Map[String, String]], Broadcast[Set[String]]) = {

import spark.implicits._

val lookupDF =
  if (tableExists(spark, lookupTable)) {
    spark.table(lookupTable).select("alias_key", "country_code")
  } else {
    // 最小可用示例字典(生产请使用表)
    // Minimal demo lookup; use table in production
    val demo = Seq(
      ("usa", "US"),
      ("us", "US"),
      ("unitedstates", "US"),
      ("u.s", "US"),
      ("unitedstatesofamerica", "US"),
      ("uk", "GB"),
      ("unitedkingdom", "GB"),
      ("greatbritain", "GB"),
      ("britain", "GB"),
      ("deutschland", "DE"),
      ("germany", "DE"),
      ("mainlandchina", "CN"),
      ("prc", "CN"),
      ("peoplesrepublicofchina", "CN"),
      ("zhonghuaRenmingongheguo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
      ("zhonghuaRenminGongHeGuo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
      ("中华人民共和国".toCharArray.mkString, "CN"), // will not match unless aliasKey applied consistently in table
      ("koreasouth", "KR"),
      ("southkorea", "KR"),
      ("대한민국".toCharArray.mkString, "KR")
    ).toDF("alias_key", "country_code")
  }

val aliasMap = lookupDF
  .filter(col("alias_key").isNotNull && col("country_code").isNotNull)
  .as[(String, String)]
  .collect()
  .toMap

val masterSet =
  if (tableExists(spark, masterTable)) {
    spark.table(masterTable).select("country_code")
      .as[String].collect().map(_.toUpperCase).toSet
  } else {
    // ISO 3166-1 alpha-2 核心示例(生产请用完整表)
    // Core sample; use complete table in production
    Set("US", "GB", "DE", "CN", "KR", "JP", "FR", "IT", "ES", "CA", "AU", "BR", "IN", "RU", "ZA", "MX", "NL", "SE", "CH", "SG", "HK", "TW", "IE", "NZ")
  }

(spark.sparkContext.broadcast(aliasMap), spark.sparkContext.broadcast(masterSet))

}

private def tableExists(spark: SparkSession, table: String): Boolean = { val parts = table.split("\.") if (parts.length == 2) { spark.catalog.tableExists(parts(0), parts(1)) } else { spark.catalog.tableExists(table) } }

// 创建用于 DataFrame 的标准化列 // Create normalized columns on a DataFrame def normalizeCountryColumn( df: DataFrame, rawCol: String = "country_code", sourceSystemCol: String = "source_system", outCol: String = "country_code_std", reasonCol: String = "normalization_reason", rawOutCol: String = "raw_country_value", normalizedByCol: String = "normalized_by", ruleVersionCol: String = "rule_version" )(implicit spark: SparkSession): DataFrame = {

val (bcAlias, bcMaster) = buildBroadcastMaps(spark)

// UDFs for cleaning and alias key
val canonicalCleanUDF = udf { s: String => canonicalClean(s) }
val aliasKeyUDF = udf { s: String => aliasKey(s) }

// UDF to normalize to ISO-2 code or null
val normalizeUDF = udf { (raw: String) =>
  val master = bcMaster.value
  val alias = bcAlias.value

  def isTwoLetter(code: String): Boolean =
    code != null && code.matches("(?i)^[A-Z]{2}$")

  if (raw == null) {
    (null: String, "missing")
  } else {
    val cleaned = canonicalClean(raw)
    if (cleaned == null || cleaned.isEmpty || cleaned.matches("^[\\p{Punct}\\s]+$")) {
      (null: String, "missing")
    } else {
      // If already a two-letter code and valid
      if (cleaned.matches("(?i)^[A-Za-z]{2}$")) {
        val up = cleaned.toUpperCase
        if (master.contains(up)) {
          (up, "ok")
        } else {
          (null: String, "unknown_alias")
        }
      } else {
        val key = aliasKey(cleaned)
        val mapped = if (key != null) alias.get(key) else None
        mapped match {
          case Some(code) if code != null && code.matches("^[A-Z]{2}$") && bcMaster.value.contains(code) =>
            (code, "ok")
          case _ =>
            (null: String, "unknown_alias")
        }
      }
    }
  }
})

val withRaw = df.withColumn(rawOutCol, col(rawCol))

val withClean = withRaw
  .withColumn("country_cleaned", canonicalCleanUDF(col(rawCol)))
  .withColumn("country_alias_key", aliasKeyUDF(col("country_cleaned")))
  .withColumn("_norm_struct", normalizeUDF(col(rawCol)))
  .withColumn(outCol, col("_norm_struct._1"))
  .withColumn(reasonCol, col("_norm_struct._2"))
  .drop("_norm_struct")
  .withColumn(normalizedByCol, lit(NormalizedBy))
  .withColumn(ruleVersionCol, lit(RuleVersion))

// Ensure uppercase output for safety
val withUpper = withClean.withColumn(outCol,
  when(col(outCol).isNotNull, upper(col(outCol))).otherwise(col(outCol))
)

// Optional: forbid anything not [A-Z]{2}
val validated = withUpper.withColumn(outCol,
  when(col(outCol).isNotNull && col(outCol).rlike("^[A-Z]{2}$"), col(outCol)).otherwise(lit(null:String))
)

// Attach source_system if not provided
val withSource =
  if (validated.columns.contains(sourceSystemCol)) validated
  else validated.withColumn(sourceSystemCol, lit(null:String))

withSource

}

// 将未知别名写入侧写/错误表(Delta/Hive) // Write unknown aliases to an error/side table (Delta/Hive) def writeUnknowns( normalizedDF: DataFrame, errorTable: String = "country_normalization_errors" ): Unit = { val unknowns = normalizedDF .filter(col("normalization_reason") =!= "ok") .select( current_timestamp().as("event_time"), col("raw_country_value"), col("country_cleaned"), col("country_alias_key"), col("normalization_reason"), col("source_system"), col("rule_version") ) unknowns.write.mode("append").format("delta").saveAsTable(errorTable) }

// 批处理示例 Batch example def runBatch( spark: SparkSession, inputTable: String = "customer_addresses", // 输入表需包含 country_code 字段 targetTable: String = "customer_addresses_silver", errorTable: String = "country_normalization_errors" ): Unit = { val input = spark.table(inputTable) val normalized = normalizeCountryColumn(input)

// 写入目标表(仅示例,可改为 merge/upsert)
normalized.write.mode("overwrite").format("delta").saveAsTable(targetTable)

// 写入未知别名
writeUnknowns(normalized, errorTable)

}

// 流式示例(以 Delta 源为例;Kafka 可将 value 解析为 DF 再套用 normalizeCountryColumn) // Streaming example (Delta source; for Kafka, parse value to DF first) def runStreaming( spark: SparkSession, sourcePath: String, // Bronze Delta path checkpointDir: String, targetTable: String = "customer_addresses_silver_stream", errorTable: String = "country_normalization_errors_stream" ): Unit = { import spark.implicits._

val streamDF = spark.readStream.format("delta").load(sourcePath)

val normalized = normalizeCountryColumn(streamDF)

val queryMain = normalized
  .writeStream
  .option("checkpointLocation", s"$checkpointDir/main")
  .outputMode("append")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write.mode("append").format("delta").saveAsTable(targetTable)
  }
  .start()

val unknowns = normalized
  .filter(col("normalization_reason") =!= "ok")
  .select(
    current_timestamp().as("event_time"),
    col("raw_country_value"),
    col("country_cleaned"),
    col("country_alias_key"),
    col("normalization_reason"),
    col("source_system"),
    col("rule_version")
  )

val queryErr = unknowns
  .writeStream
  .option("checkpointLocation", s"$checkpointDir/errors")
  .outputMode("append")
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>
    batchDF.write.mode("append").format("delta").saveAsTable(errorTable)
  }
  .start()

queryMain.awaitTermination()
queryErr.awaitTermination()

}

// 简单使用示例 Simple usage example def example(spark: SparkSession): Unit = { import spark.implicits._ val data = Seq( (" united states ", "crm"), ("uk", "web"), ("中华人民共和国", "app"), ("Korea, South", "erp"), ("U.S.", "crm"), (null, "crm"), ("??", "web"), ("deutschland", "erp"), ("PRC", "app") ).toDF("country_code", "source_system")

val normalized = normalizeCountryColumn(data)
normalized.select("country_code", "country_code_std", "normalization_reason", "raw_country_value", "normalized_by", "rule_version", "source_system")
  .show(false)
// 期望输出 Expected:
// '  united  states ' -> 'US'
// 'uk' -> 'GB'
// '中华人民共和国' -> 'CN'  (依赖字典 alias_key 收录中文别名;可在 iso_country_lookup 中加入 alias)
// 'Korea, South' -> 'KR'
// 'U.S.' -> 'US'

} }

// 备注 Notes: // - 为确保校验严格,推荐维护完整的 iso_country_master(~249 个 ISO-2 代码)。 // - iso_country_lookup 中 alias_key 必须用与代码相同的 canonical 规则生成(见 aliasKey)。 // - 若别名规模较小(数千级),广播 Map 性能良好;若更大,改用 join: // df.join(broadcast(lookup), Seq("country_alias_key"), "left") // - 在生产中建议以 MERGE 方式更新目标表,并维护数据变更审计日志。 // - 本实现不修改原始列,只新增标准化列与审计列,便于回溯。

示例详情

📖 如何使用

30秒出活:复制 → 粘贴 → 搞定
与其花几十分钟和AI聊天、试错,不如直接复制这些经过千人验证的模板,修改几个 {{变量}} 就能立刻获得专业级输出。省下来的时间,足够你轻松享受两杯咖啡!
加载中...
💬 不会填参数?让 AI 反过来问你
不确定变量该填什么?一键转为对话模式,AI 会像资深顾问一样逐步引导你,问几个问题就能自动生成完美匹配你需求的定制结果。零门槛,开口就行。
转为对话模式
🚀 告别复制粘贴,Chat 里直接调用
无需切换,输入 / 唤醒 8000+ 专家级提示词。 插件将全站提示词库深度集成于 Chat 输入框。基于当前对话语境,系统智能推荐最契合的 Prompt 并自动完成参数化,让海量资源触手可及,从此彻底告别"手动搬运"。
即将推出
🔌 接口一调,提示词自己会进化
手动跑一次还行,跑一百次呢?通过 API 接口动态注入变量,接入批量评价引擎,让程序自动迭代出更高质量的提示词方案。Prompt 会自己进化,你只管收结果。
发布 API
🤖 一键变成你的专属 Agent 应用
不想每次都配参数?把这条提示词直接发布成独立 Agent,内嵌图片生成、参数优化等工具,分享链接就能用。给团队或客户一个"开箱即用"的完整方案。
创建 Agent

✅ 特性总结

即输表名与列名,轻松生成可执行的规范化规则,快速解决命名、日期、数值等常见混乱。
一键输出结构清晰的规则说明与示例,便于团队统一执行、落地到日常数据整理流程。
自动识别上下文需求,给出适配不同业务场景的规范建议,减少反复沟通与规则争议。
可按语言切换输出内容,支持中文、英文等团队协作场景,推进跨部门规范统一。
提供可复制的校验步骤与异常处理建议,帮助你快速发现脏数据并制定修复路径。
模板化参数输入,按需定制规则颗粒度与覆盖范围,兼顾灵活性与可落地性。
内置专业写作风格,输出严谨、易读、结构化的说明文档,直接用于评审与培训。
支持多行业字段场景,如商品编码、客户信息、时间记录等,快速形成可复用的最佳实践。

🎯 解决的问题

通过一条可复用的高效提示词,快速为“指定表-指定列”生成清晰、可落地的数据规范化规则:1)把零散命名、格式不统一、异常值等,转化为可执行的清洗标准;2)让非技术成员也能与“内置数据工程师”协作,缩短数据整理周期;3)支持多语言输出,帮助全球团队统一口径;4)可迭代复用,沉淀企业级字段标准,提升报表与分析的准确度;5)以更低成本替代大规模手工清洗,推动数据产品、BI与增长项目更快上线。

🕒 版本历史

当前版本
v2.1 2024-01-15
优化输出结构,增强情节连贯性
  • ✨ 新增章节节奏控制参数
  • 🔧 优化人物关系描述逻辑
  • 📝 改进主题深化引导语
  • 🎯 增强情节转折点设计
v2.0 2023-12-20
重构提示词架构,提升生成质量
  • 🚀 全新的提示词结构设计
  • 📊 增加输出格式化选项
  • 💡 优化角色塑造引导
v1.5 2023-11-10
修复已知问题,提升稳定性
  • 🐛 修复长文本处理bug
  • ⚡ 提升响应速度
v1.0 2023-10-01
首次发布
  • 🎉 初始版本上线
COMING SOON
版本历史追踪,即将启航
记录每一次提示词的进化与升级,敬请期待。

💬 用户评价

4.8
⭐⭐⭐⭐⭐
基于 28 条评价
5星
85%
4星
12%
3星
3%
👤
电商运营 - 张先生
⭐⭐⭐⭐⭐ 2025-01-15
双十一用这个提示词生成了20多张海报,效果非常好!点击率提升了35%,节省了大量设计时间。参数调整很灵活,能快速适配不同节日。
效果好 节省时间
👤
品牌设计师 - 李女士
⭐⭐⭐⭐⭐ 2025-01-10
作为设计师,这个提示词帮我快速生成创意方向,大大提升了工作效率。生成的海报氛围感很强,稍作调整就能直接使用。
创意好 专业
COMING SOON
用户评价与反馈系统,即将上线
倾听真实反馈,在这里留下您的使用心得,敬请期待。
加载中...