热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
为指定表格列生成专业的数据规范化规则,提供技术性解决方案,确保数据预处理、清洗和管道构建过程的准确性与一致性,提升数据质量和数据工程效率。
""" 规则版本: name_normalization.v1
目标/Goal:
规则/Rules:
Examples:
from future import annotations
import re import hashlib import unicodedata from datetime import datetime from typing import Dict, List, Optional, Tuple, Any
RULE_VERSION = "name_normalization.v1"
lookup_names_titles: Dict[str, Dict[str, str]] = { "prefix": { # English "mr": "Mr.", "mrs": "Mrs.", "ms": "Ms.", "miss": "Ms.", "dr": "Dr.", "prof": "Prof.", "sir": "Sir", "madam": "Ms.", "mx": "Mx.", # Chinese to English mappings (optional normalization) "先生": "Mr.", "女士": "Ms.", "小姐": "Ms.", "博士": "Dr.", "教授": "Prof.", }, "suffix": { # English "phd": "PhD", "md": "MD", "dds": "DDS", "dvm": "DVM", "jd": "JD", "mba": "MBA", "jr": "Jr.", "sr": "Sr.", "iii": "III", "iv": "IV", # Chinese degrees could be mapped if desired; left empty to avoid lossy conversion }, }
_EMOJI_PATTERN = re.compile( "[" + "\U0001F600-\U0001F64F" + # Emoticons "\U0001F300-\U0001F5FF" + # Misc Symbols and Pictographs "\U0001F680-\U0001F6FF" + # Transport & Map "\U0001F1E6-\U0001F1FF" + # Regional Indicators "\U0001F900-\U0001F9FF" + # Supplemental Symbols and Pictographs "\U0001FA70-\U0001FAFF" + # Symbols & Pictographs Extended-A "\U00002700-\U000027BF" + # Dingbats "\U00002600-\U000026FF" + # Misc symbols "]", flags=re.UNICODE )
_HTML_PATTERN = re.compile(r"<[^>]*>") _MULTI_SPACE_PATTERN = re.compile(r"\s+")
_ALLOWED_SEPARATORS = {"-", "·", "’"} # hyphen, middle dot, right single quotation mark
TRAILING_PUNCT_CHARS = ".,;:-"
def _strip_html(s: str) -> str: return _HTML_PATTERN.sub("", s)
def _remove_emoji(s: str) -> str: return _EMOJI_PATTERN.sub("", s)
def _remove_control_chars(s: str) -> str: # Remove Unicode category Cc (control chars) return "".join(ch for ch in s if unicodedata.category(ch) != "Cc")
def _normalize_width_and_symbols(s: str) -> str: # Normalize width (fullwidth -> halfwidth) and standardize symbols s = unicodedata.normalize("NFKC", s) # Standardize apostrophes to U+2019 s = s.replace("\u2018", "’").replace("\u2019", "’").replace("'", "’").replace("`", "’") # Standardize hyphens to ASCII hyphen-minus s = s.replace("\u2013", "-").replace("\u2014", "-").replace("\u2212", "-").replace("-", "-") # Leave middle dot as is return s
def _strip_trailing_punct(s: str) -> str: return s.rstrip(_TRAILING_PUNCT_CHARS)
def _collapse_spaces(s: str) -> str: s = s.strip() s = _MULTI_SPACE_PATTERN.sub(" ", s) return s
def _digit_ratio(s: str) -> float: if not s: return 0.0 total = sum(1 for ch in s if not ch.isspace()) digits = sum(1 for ch in s if ch.isdigit()) return digits / total if total > 0 else 0.0
def _is_latin_token(token: str) -> bool: # Consider a token Latin if all letters are from Latin scripts or separators/punct for ch in token: if ch.isalpha(): try: name = unicodedata.name(ch) except ValueError: return False if "LATIN" not in name: return False elif ch in _ALLOWED_SEPARATORS or ch in {".", " "}: continue else: # other characters like Chinese/Cyrillic return False return True
def _titlecase_latin_token(token: str) -> str: # Break on allowed separators and apply casing per segment def tc_segment(seg: str) -> str: if not seg: return seg lower = seg.lower() # Preserve Mc/Mac style (basic heuristics) if lower.startswith("mc") and len(lower) > 2: return "Mc" + lower[2:3].upper() + lower[3:] # Keep single-letter initials uppercase (e.g., "j." -> "J.") if len(seg) == 1: return seg.upper() return lower[0].upper() + lower[1:] # Tokens can have periods (e.g., "dr."), leave them for title mapping step # Split by apostrophe and hyphen to title-case each segment parts_apostrophe = token.split("’") parts_apostrophe = [subpart for subpart in parts_apostrophe] for i, ap in enumerate(parts_apostrophe): subparts = ap.split("-") subparts = [tc_segment(sp) for sp in subparts] parts_apostrophe[i] = "-".join(subparts) return "’".join(parts_apostrophe)
def _standardize_prefix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: # Normalize token for matching (strip trailing dot) candidate = token.replace(".", "") canonical = title_lookup.get("prefix", {}).get(candidate.lower()) if canonical: return canonical return None
def _standardize_suffix(token: str, title_lookup: Dict[str, Dict[str, str]]) -> Optional[str]: candidate = token.replace(".", "") canonical = title_lookup.get("suffix", {}).get(candidate.lower()) if canonical: return canonical return None
def _process_tokens(tokens: List[str], title_lookup: Dict[str, Dict[str, str]]) -> List[str]: if not tokens: return tokens
# Handle prefix at first token
first = tokens[0]
pref = _standardize_prefix(first, title_lookup)
if pref:
tokens[0] = pref
start_idx = 1
else:
start_idx = 0
# Handle suffix at last token
last = tokens[-1]
suff = _standardize_suffix(last, title_lookup)
if suff:
tokens[-1] = suff
end_idx = len(tokens) - 1
else:
end_idx = len(tokens)
# Title-case middle tokens that are Latin
for i in range(start_idx, end_idx):
tok = tokens[i]
if _is_latin_token(tok):
tokens[i] = _titlecase_latin_token(tok)
else:
# Non-Latin: leave as-is (e.g., Chinese), only width/symbol normalization already applied
tokens[i] = tok
return tokens
def _remove_digits(s: str) -> str: return re.sub(r"\d+", "", s)
def _normalize_name_string(raw: str, title_lookup: Dict[str, Dict[str, str]]) -> Tuple[Optional[str], Dict[str, Any]]: """ Returns (normalized_name or None, audit_info) audit_info contains: reason, notes """ audit = {"reason": None, "notes": {}}
if raw is None:
audit["reason"] = "empty"
return None, audit
original = raw
# Numeric ratio check on original text (excluding spaces)
ratio = _digit_ratio(original)
if ratio > 0.20:
audit["reason"] = "invalid:numeric_ratio"
audit["notes"]["digit_ratio"] = ratio
return None, audit
# Pre-cleaning
s = _strip_html(original)
s = _remove_control_chars(s)
s = _remove_emoji(s)
s = _normalize_width_and_symbols(s)
s = _collapse_spaces(s)
# Remove digits (Arabic numerals) per rule
s = _remove_digits(s)
# Strip trailing punctuation
s = _strip_trailing_punct(s)
# Space collapse again if removal created double spaces
s = _collapse_spaces(s)
# Missing value check
if not s or s.strip() == "":
audit["reason"] = "empty"
return None, audit
# Tokenization by spaces for prefix/suffix handling and Latin title-case
tokens = s.split(" ")
# Standardize titles and apply Latin title-case
tokens = _process_tokens(tokens, title_lookup)
normalized = " ".join(tokens)
# Length validation
nlen = len(normalized)
if nlen == 0:
audit["reason"] = "empty"
return None, audit
if nlen > 120:
normalized = normalized[:120]
audit["reason"] = "truncated"
audit["notes"]["original_length"] = nlen
audit["notes"]["truncated_to"] = 120
else:
audit["reason"] = "ok"
return normalized, audit
def _sha256_hex(s: str) -> str: return hashlib.sha256(s.encode("utf-8")).hexdigest()
def normalize_full_name( name: Optional[str], *, title_lookup: Dict[str, Dict[str, str]] = lookup_names_titles, ) -> Dict[str, Any]: """ Normalize a full name string according to name_normalization.v1.
Returns a dict:
{
"normalized_full_name": Optional[str],
"full_name_hash": Optional[str], # SHA-256 of normalized string
"audit_reason": str, # "ok", "empty", "invalid:numeric_ratio", "truncated"
"audit_notes": Dict[str, Any],
"version": RULE_VERSION,
}
Idempotent: applying the function multiple times yields the same result.
"""
normalized, audit = _normalize_name_string(name, title_lookup)
result = {
"normalized_full_name": normalized,
"full_name_hash": _sha256_hex(normalized) if normalized else None,
"audit_reason": audit["reason"],
"audit_notes": audit.get("notes", {}),
"version": RULE_VERSION,
}
return result
def _parse_datetime(dt: Any) -> Optional[datetime]: if dt is None: return None if isinstance(dt, datetime): return dt # Try ISO 8601 try: # Support 'Z' suffix if isinstance(dt, str): if dt.endswith("Z"): dt = dt[:-1] return datetime.fromisoformat(dt) except Exception: return None return None
def deduplicate_contacts(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Deduplicate contacts by normalized_full_name SHA-256 hash.
Input records: list of dicts, each containing at least:
{
"id": Any,
"full_name": str,
"source_id": Any,
"updated_at": datetime or ISO8601 str,
... (other fields)
}
Process:
- Normalize full_name (apply rules).
- Group by normalized_full_name hash.
- Keep the latest record (max updated_at) per group.
- Summarize source_ids (unique) and occurrences (count).
- Records with NULL normalized_full_name are excluded from grouping and returned individually with audit.
Returns list of dicts:
{
"id": Any, # id of retained (latest) record
"normalized_full_name": Optional[str],
"full_name_hash": Optional[str],
"audit_reason": str,
"audit_notes": Dict[str, Any],
"version": RULE_VERSION,
"source_ids": List[Any], # aggregated for grouped records
"occurrences": int, # count in group
"representative_record": Dict[str, Any], # latest record snapshot
}
"""
groups: Dict[str, Dict[str, Any]] = {}
invalids: List[Dict[str, Any]] = []
for rec in records:
norm = normalize_full_name(rec.get("full_name"))
updated = _parse_datetime(rec.get("updated_at"))
enriched = {
"id": rec.get("id"),
"normalized_full_name": norm["normalized_full_name"],
"full_name_hash": norm["full_name_hash"],
"audit_reason": norm["audit_reason"],
"audit_notes": norm["audit_notes"],
"version": RULE_VERSION,
"source_ids": [rec.get("source_id")],
"occurrences": 1,
"representative_record": rec,
"_updated_at": updated or datetime.min,
}
if norm["normalized_full_name"] is None or norm["full_name_hash"] is None:
# keep as standalone invalid/empty
invalids.append(enriched)
continue
key = norm["full_name_hash"]
if key not in groups:
groups[key] = enriched
else:
grp = groups[key]
# Aggregate source_ids
grp["source_ids"].append(rec.get("source_id"))
grp["occurrences"] += 1
# Choose latest record
cur_dt = grp["_updated_at"]
new_dt = enriched["_updated_at"]
if new_dt and new_dt > cur_dt:
grp["id"] = enriched["id"]
grp["representative_record"] = rec
grp["_updated_at"] = new_dt
# Audit reason: if any member truncated, keep "ok"/"truncated" precedence
# Prefer "ok" over "truncated" only if representative not truncated; else keep truncated
# If any invalid slipped here (shouldn't), ignore since norm exists
# Dedup group keeps normalized_full_name from representative
groups[key] = grp
# Finalize output: dedup source_ids unique, remove internal fields
output: List[Dict[str, Any]] = []
for g in groups.values():
g["source_ids"] = [sid for sid in set(g["source_ids"]) if sid is not None]
g.pop("_updated_at", None)
output.append(g)
# Include invalids as independent (no grouping)
for inv in invalids:
inv["source_ids"] = [sid for sid in set(inv["source_ids"]) if sid is not None]
inv.pop("_updated_at", None)
output.append(inv)
return output
def process_batch(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Batch processor suitable for Airflow/Prefect tasks: - Applies normalization and deduplication. - Returns normalized, deduplicated list with audit info. """ return deduplicate_contacts(records)
def airflow_task(**context) -> List[Dict[str, Any]]: """ Airflow-compatible callable. Expects 'records' in context['params'] or context['ti'].xcom_pull(). """ params = context.get("params", {}) if context else {} records = params.get("records") if records is None and context: try: records = context["ti"].xcom_pull(task_ids=params.get("upstream_task_id")) except Exception: records = [] records = records or [] return process_batch(records)
def prefect_task(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """ Prefect-compatible callable. """ return process_batch(records)
def _demo(): samples = [ {"id": 1, "full_name": " ms. anna o’connor ", "source_id": "crm_a", "updated_at": "2024-01-02T12:00:00Z"}, {"id": 2, "full_name": "Ms anna O'CONNOR", "source_id": "crm_b", "updated_at": "2024-03-05T09:30:00Z"}, {"id": 3, "full_name": "张三", "source_id": "crm_c", "updated_at": "2024-05-01T08:00:00Z"}, {"id": 4, "full_name": "Dr john mcDonald phd", "source_id": "crm_d", "updated_at": "2024-02-02T10:00:00Z"}, {"id": 5, "full_name": " ", "source_id": "crm_e", "updated_at": "2024-02-03T10:00:00Z"}, {"id": 6, "full_name": "Anna 1234 O’Connor 5678", "source_id": "crm_f", "updated_at": "2024-02-04T10:00:00Z"}, ] result = process_batch(samples) for r in result: print(r)
if name == "main": _demo()
-- 注:本示例使用 Snowflake SQL 方言;若使用其他引擎,请替换等效函数(TRY_TO_DATE/TRY_TO_TIMESTAMP[_TZ]、CONVERT_TIMEZONE、REGEXP_REPLACE 等)。
-- 可配置解析偏好与默认时区(优先地区/Preferred locale: DMY;默认时区/Default TZ: Europe/Berlin) -- 将 locale_preference 改为 'MDY' 可按美式优先;preferred_tz 改为业务地区。 WITH cfg AS ( SELECT 'DMY'::STRING AS locale_preference, -- 解析优先顺序/Parsing priority 'Europe/Berlin'::STRING AS preferred_tz, -- 缺省本地时区/Default local timezone 'date_normalization.v1'::STRING AS version ), src AS ( SELECT t., CAST(order_date AS STRING) AS raw_order_date -- 源字段快照/Raw snapshot FROM ecommerce_orders t ), clean AS ( SELECT s., -- 预清洗/Pre-cleaning -- 1) 去前后空格 Trim -- 2) 去前缀噪声 Remove 'Date:' 前缀 -- 3) 去括号/方括号注释 Remove bracketed annotations -- 4) 统一分隔符 . / -> - -- 5) 移除序数后缀 1st/2nd/3rd/4th 等 Remove ordinal suffixes -- 6) 多空格压缩 Single-space normalization NULLIF( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( REGEXP_REPLACE( TRIM(raw_order_date), '(?i)^\sdate\s:\s*', '' -- remove leading 'Date:' ), '\s*[([].?[)]]', '' -- strip (...) or [...] ), '[./]', '-' -- unify separators to '-' ), '(?<=\d)(?i)(st|nd|rd|th)\b', '' -- remove ordinal suffixes ), '\s+', ' ' -- collapse multiple spaces ), '' ) AS cleaned_order_date FROM src s ), flags AS ( SELECT c., -- 缺失值识别 Missing values CASE WHEN cleaned_order_date IS NULL THEN TRUE WHEN UPPER(cleaned_order_date) IN ('N/A', 'NA', 'UNKNOWN', 'NULL') THEN TRUE ELSE FALSE END AS is_missing, -- 明显非法模式 Sentinel invalids CASE WHEN cleaned_order_date IS NULL THEN FALSE WHEN REGEXP_LIKE(cleaned_order_date, '^(?i)(0000-00-00|99-99-9999)$') THEN TRUE WHEN REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$') THEN FALSE -- 纯8位数字单独处理 ELSE FALSE END AS is_invalid_sentinel, -- 粗略检测时区/时间存在性 REGEXP_LIKE(cleaned_order_date, '(Z\b)|([+-]\d{2}:?\d{2})|\b(UTC|GMT)\b|[A-Za-z]+/[A-Za-z_]+') AS has_tz_hint, REGEXP_LIKE(cleaned_order_date, 'T\d{2}:\d{2}|\b\d{1,2}:\d{2}') AS has_time_hint FROM clean c ), parsed AS ( SELECT f.*, -- 解析尝试/Parsing attempts (按配置优先/locale preference first) (SELECT locale_preference FROM cfg) AS locale_preference, (SELECT preferred_tz FROM cfg) AS preferred_tz, (SELECT version FROM cfg) AS normalization_version,
/* 1) 时区感知时间戳:若字符串包含时区提示,优先用 TRY_TO_TIMESTAMP_TZ 自动解析 */
CASE
WHEN has_tz_hint THEN
COALESCE(
TRY_TO_TIMESTAMP_TZ(cleaned_order_date),
TRY_TO_TIMESTAMP_TZ(REGEXP_REPLACE(cleaned_order_date, ' ', 'T'))
)
ELSE NULL
END AS ts_tz,
/* 2) 本地无时区时间戳(按 preferred_tz 解释本地时间;Snowflake 中值无时区,小时窗口在该本地时间上判定) */
COALESCE(
/* ISO with time */
TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD"T"HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'YYYY-MM-DD HH24:MI'),
/* DMY/MDY with time (分隔符已统一为 -) */
CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
COALESCE(
TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'), -- fallback
TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI')
)
ELSE
COALESCE(
TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'MM-DD-YYYY HH24:MI'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI:SS'), -- fallback
TRY_TO_TIMESTAMP(cleaned_order_date, 'DD-MM-YYYY HH24:MI')
)
END,
/* Natural language with time (rare): 'Mar 4, 2025 13:45:00' */
TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'Mon DD, YYYY HH24:MI'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI:SS'),
TRY_TO_TIMESTAMP(cleaned_order_date, 'Month DD, YYYY HH24:MI')
) AS ts_local_ntz,
/* 3) 仅日期(无时间)/Date-only */
COALESCE(
TRY_TO_DATE(cleaned_order_date, 'YYYY-MM-DD'),
CASE WHEN (SELECT locale_preference FROM cfg)='DMY' THEN
COALESCE(
TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY'),
TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY')
)
ELSE
COALESCE(
TRY_TO_DATE(cleaned_order_date, 'MM-DD-YYYY'),
TRY_TO_DATE(cleaned_order_date, 'DD-MM-YYYY')
)
END,
TRY_TO_DATE(cleaned_order_date, 'Mon DD, YYYY'),
TRY_TO_DATE(cleaned_order_date, 'Month DD, YYYY')
) AS d_only
FROM flags f ), evaluated AS ( SELECT p.*,
/* 解析格式标记/Parse format tag for auditing */
CASE
WHEN is_missing THEN 'MISSING'
WHEN is_invalid_sentinel THEN 'SENTINEL_INVALID'
WHEN ts_tz IS NOT NULL THEN 'TIMESTAMP_TZ_AUTO'
WHEN ts_local_ntz IS NOT NULL THEN
CASE
WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}(?:[ T]\d{2}:\d{2}(:\d{2})?)?$') THEN 'ISO_LOCAL_TS'
WHEN locale_preference='DMY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'DMY_LOCAL_TS'
WHEN locale_preference='MDY' AND REGEXP_LIKE(cleaned_order_date, '^\d{2}-\d{2}-\d{4}') THEN 'MDY_LOCAL_TS'
ELSE 'NATURAL_LANG_LOCAL_TS'
END
WHEN d_only IS NOT NULL THEN
CASE
WHEN REGEXP_LIKE(cleaned_order_date, '^\d{4}-\d{2}-\d{2}$') THEN 'ISO_DATE'
WHEN REGEXP_LIKE(cleaned_order_date, '^(Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)\b', 'i') THEN 'NATURAL_LANG_DATE'
ELSE CASE WHEN locale_preference='DMY' THEN 'DMY_DATE' ELSE 'MDY_DATE' END
END
ELSE 'UNPARSEABLE'
END AS order_date_parse_format,
/* 源时区标记/Source timezone label */
CASE
WHEN ts_tz IS NOT NULL THEN
-- 使用偏移量表示源时区,用 +HH:MM/-HH:MM 格式/Format offset as +HH:MM
LPAD(
IFF(
(DATE_PART('timezone_hour', ts_tz) * 60 + DATE_PART('timezone_minute', ts_tz)) < 0,
'-', '+'
) || LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_hour', ts_tz))), 2, '0') || ':' ||
LPAD(TO_VARCHAR(ABS(DATE_PART('timezone_minute', ts_tz))), 2, '0'),
6, '+') -- ensures string shape
WHEN ts_local_ntz IS NOT NULL THEN (SELECT preferred_tz FROM cfg)
ELSE NULL
END AS order_date_src_timezone,
/* 本地日期与凌晨窗口调整/Local date with 00:00–01:59 backshift */
CASE
WHEN ts_tz IS NOT NULL THEN
CAST(ts_tz AS DATE)
- IFF(DATE_PART('hour', ts_tz) BETWEEN 0 AND 1, 1, 0)
WHEN ts_local_ntz IS NOT NULL THEN
CAST(ts_local_ntz AS DATE)
- IFF(DATE_PART('hour', ts_local_ntz) BETWEEN 0 AND 1, 1, 0)
ELSE
d_only
END AS order_date_norm_candidate
FROM parsed p ), validated AS ( SELECT e.*,
/* 对纯8位数字进行安全回退(YYYYMMDD 或 DDMMYYYY/ MMDDYYYY)/Safe fallback for 8-digit numbers */
CASE
WHEN order_date_norm_candidate IS NULL
AND NOT is_missing
AND NOT is_invalid_sentinel
AND REGEXP_LIKE(REPLACE(cleaned_order_date, '-', ''), '^\d{8}$')
THEN
/* 首选 YMD,其次按配置优先 DMY/MDY */
COALESCE(
TRY_TO_DATE(cleaned_order_date, 'YYYYMMDD'),
CASE WHEN locale_preference='DMY' THEN
COALESCE(TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'), TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'))
ELSE
COALESCE(TRY_TO_DATE(cleaned_order_date, 'MMDDYYYY'), TRY_TO_DATE(cleaned_order_date, 'DDMMYYYY'))
END
)
ELSE NULL
END AS yyyymmdd_fallback
FROM evaluated e ), finalized AS ( SELECT v.*, COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) AS order_date_normalized,
CASE
WHEN is_missing THEN 'missing'
WHEN is_invalid_sentinel THEN 'invalid_date_sentinel'
WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NULL THEN 'invalid_date'
ELSE NULL
END AS order_date_invalid_reason,
CASE
WHEN COALESCE(v.order_date_norm_candidate, v.yyyymmdd_fallback) IS NOT NULL THEN 'ok'
ELSE 'null'
END AS order_date_normalization_status
FROM validated v )
-- 选项A:创建只读规范化视图(推荐先用于验证)/Option A: Create a read-only normalization view (recommended for validation) -- 视图包含审计列与规范化后的日期;不修改原表。 SELECT /* 原表所有列/All original columns / f. EXCLUDE (raw_order_date, cleaned_order_date, is_missing, is_invalid_sentinel, has_tz_hint, has_time_hint, locale_preference, preferred_tz, normalization_version, ts_tz, ts_local_ntz, d_only, order_date_parse_format, order_date_src_timezone, order_date_norm_candidate, yyyymmdd_fallback, order_date_normalized, order_date_invalid_reason, order_date_normalization_status),
/* 规范化输出/Normalized outputs */ f.raw_order_date, f.cleaned_order_date, f.order_date_normalized AS normalized_order_date, -- 最终 ISO 日期/Final ISO date (YYYY-MM-DD) f.order_date_parse_format, f.order_date_src_timezone, f.order_date_invalid_reason, f.order_date_normalization_status, f.normalization_version FROM finalized f;
-- MERGE 示例:以主键 id 作为匹配键(请按实际主键替换) -- Example MERGE using primary key id (replace with actual PK) -- MERGE INTO ecommerce_orders t -- USING ( -- WITH cfg AS (... same as above ...) -- SELECT -- <primary_key> AS pk, -- raw_order_date, -- order_date_normalized, -- order_date_parse_format, -- order_date_src_timezone, -- order_date_invalid_reason, -- order_date_normalization_status, -- normalization_version -- FROM finalized -- ) s -- ON t.<primary_key> = s.pk -- WHEN MATCHED THEN UPDATE SET -- t.raw_order_date_audit = s.raw_order_date, -- t.normalized_order_date = s.order_date_normalized, -- t.order_date_parse_format = s.order_date_parse_format, -- t.order_date_src_timezone = s.order_date_src_timezone, -- t.order_date_invalid_reason = s.order_date_invalid_reason, -- t.order_date_normalization_status = s.order_date_normalization_status, -- t.normalization_version = s.normalization_version;
-- 示例验证/Examples validation (预期): -- '03/04/2025' (欧式 DMY) -> '2025-04-03' -- 'Mar 4, 2025' -> '2025-03-04' -- '2025-03-04T00:30:00-05:00' -> 本地 00:30,按窗口归前一日 -> '2025-03-03' -- '0000-00-00' -> NULL, invalid_date_sentinel -- 'N/A' -> NULL, missing
// 规则说明(中文 + English) // 目标 / Goal: // 将 customer_addresses.country_code 统一为 ISO 3166-1 alpha-2(大写),兼容多语言与俗称别名。 // Normalize customer_addresses.country_code to ISO 3166-1 alpha-2 (uppercase), supporting multilingual aliases. // // 规则 / Rules: // 1) 文本清理 Text cleaning: // - 修剪空格、折叠多空格;统一全角/半角;统一常见分隔符为单空格;去除末尾标点与括号注释。 // - Trim, collapse multiple spaces; normalize full-width to half-width; normalize separators to single space; // remove trailing punctuation and trailing bracketed notes ((), [], {}, ()). // // 2) 大小写统一 Casing: // - 输入不区分大小写;输出标准代码必须为[A-Z]{2}(大写)。 // - Case-insensitive input; output must be two uppercase letters [A-Z]{2}. // // 3) 别名映射(其他) Alias mapping (Other): // - 使用可维护字典表 iso_country_lookup 将多语言/俗称映射为标准代码;优先通过 alias_key(清洗+规范后的键)匹配。 // - Use maintainable dictionary table iso_country_lookup to map aliases to standard codes via alias_key. // - 示例 Examples: 'USA'/'U.S.'/'United States' -> 'US';'UK'/'United Kingdom' -> 'GB'; // 'Deutschland' -> 'DE';'Mainland China'/'PRC' -> 'CN';'Korea, South' -> 'KR'。 // - 字典应包含中文/英文/常见本地语言别名。The lookup should include CN/EN/local names. // // 4) 缺失值处理 Missing values: // - NULL、空串、仅标点/空白 → 输出 NULL;reason = 'missing';记录 raw_value。 // - NULL/empty/punct-only → NULL with reason 'missing', keep raw_value. // // 5) 校验 Validation: // - 若输入本身为合法代码(匹配[A-Za-z]{2},且存在于 master 列表),直接转为大写输出。 // - 否则尝试别名映射;仍失败 → 输出 NULL;reason = 'unknown_alias'。 // - If input is already a valid ISO code (case-insensitive and present in master), uppercase and keep. // Else try alias; if still not found → NULL with reason 'unknown_alias'. // // 6) 变更审计 Auditing: // - 增加字段:normalized_by、rule_version(country_normalization.v1)、source_system、raw_country_value、normalization_reason。 // - Add fields: normalized_by, rule_version (country_normalization.v1), source_system, // raw_country_value, normalization_reason. // // 7) 输出与管道适配 Output & pipeline: // - 提供 Spark/Scala 批处理与流式清洗示例;未知别名写入错误/侧写表以便回填字典。 // - Provide Spark/Scala batch and streaming examples; write unknown aliases to error table for curation. // // 推荐维表结构 Recommended lookup schema (Delta/Hive): // iso_country_lookup(alias_key STRING, country_code STRING, alias_text STRING, lang STRING, updated_at TIMESTAMP) // iso_country_master(country_code STRING) -- 主表,仅含有效 ISO-2 代码 // // --------------------------------------------------------------------------------------- // Spark/Scala 实现示例 Spark/Scala implementation // 依赖:Spark 3.x,Structured Streaming 可选。Dependencies: Spark 3.x.
import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.broadcast.Broadcast
object CountryNormalization {
// 审计元数据 Audit metadata val RuleVersion = "country_normalization.v1" val NormalizedBy = "de_pipeline"
// 将全角字符转换为半角 Convert full-width to half-width private def toHalfWidth(s: String): String = { if (s == null) return null val sb = new StringBuilder(s.length) s.foreach { ch => ch match { case '\u3000' => sb.append(' ') // full-width space case c if c >= '\uFF01' && c <= '\uFF5E' => sb.append((c - 0xFEE0).toChar) // ASCII full-width block -> ASCII case other => sb.append(other) } } sb.toString() }
// 文本清理:修剪、折叠空格、去括注(尾部)、去尾随标点、统一分隔符 // Text cleanup: trim, collapse spaces, strip trailing bracketed notes, strip trailing punctuation, normalize separators private def canonicalClean(s: String): String = { if (s == null) return null val hw = toHalfWidth(s) val trimmed = hw.trim if (trimmed.isEmpty) return "" // Normalize common separators to space val sepNorm = trimmed .replaceAll("[_./·・]", " ") .replaceAll("-", " ") val collapsed = sepNorm.replaceAll("\s+", " ") // Remove trailing bracketed notes like "China (Mainland)" -> "China" val noTrailingNote = collapsed.replaceAll("\s*[\((\[【\{][^\))\]】\}][\))\]】\}]\s$", "") // Strip trailing punctuation val noTrailingPunct = noTrailingNote.replaceAll("[\p{Punct}.。、,·・]+$", "").trim noTrailingPunct }
// 生成 alias_key:清洗后转小写,移除所有非字母数字 // Build alias_key: clean, lowercase, remove non-alphanumeric private def aliasKey(s: String): String = { if (s == null) return null val cleaned = canonicalClean(s) if (cleaned == null || cleaned.isEmpty) return cleaned cleaned.toLowerCase.replaceAll("[^a-z0-9]", "") }
// 加载并广播字典映射与主码集合 // Load and broadcast alias->code map and master code set def buildBroadcastMaps( spark: SparkSession, lookupTable: String = "iso_country_lookup", masterTable: String = "iso_country_master" ): (Broadcast[Map[String, String]], Broadcast[Set[String]]) = {
import spark.implicits._
val lookupDF =
if (tableExists(spark, lookupTable)) {
spark.table(lookupTable).select("alias_key", "country_code")
} else {
// 最小可用示例字典(生产请使用表)
// Minimal demo lookup; use table in production
val demo = Seq(
("usa", "US"),
("us", "US"),
("unitedstates", "US"),
("u.s", "US"),
("unitedstatesofamerica", "US"),
("uk", "GB"),
("unitedkingdom", "GB"),
("greatbritain", "GB"),
("britain", "GB"),
("deutschland", "DE"),
("germany", "DE"),
("mainlandchina", "CN"),
("prc", "CN"),
("peoplesrepublicofchina", "CN"),
("zhonghuaRenmingongheguo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
("zhonghuaRenminGongHeGuo".toLowerCase.replaceAll("[^a-z0-9]", ""), "CN"),
("中华人民共和国".toCharArray.mkString, "CN"), // will not match unless aliasKey applied consistently in table
("koreasouth", "KR"),
("southkorea", "KR"),
("대한민국".toCharArray.mkString, "KR")
).toDF("alias_key", "country_code")
}
val aliasMap = lookupDF
.filter(col("alias_key").isNotNull && col("country_code").isNotNull)
.as[(String, String)]
.collect()
.toMap
val masterSet =
if (tableExists(spark, masterTable)) {
spark.table(masterTable).select("country_code")
.as[String].collect().map(_.toUpperCase).toSet
} else {
// ISO 3166-1 alpha-2 核心示例(生产请用完整表)
// Core sample; use complete table in production
Set("US", "GB", "DE", "CN", "KR", "JP", "FR", "IT", "ES", "CA", "AU", "BR", "IN", "RU", "ZA", "MX", "NL", "SE", "CH", "SG", "HK", "TW", "IE", "NZ")
}
(spark.sparkContext.broadcast(aliasMap), spark.sparkContext.broadcast(masterSet))
}
private def tableExists(spark: SparkSession, table: String): Boolean = { val parts = table.split("\.") if (parts.length == 2) { spark.catalog.tableExists(parts(0), parts(1)) } else { spark.catalog.tableExists(table) } }
// 创建用于 DataFrame 的标准化列 // Create normalized columns on a DataFrame def normalizeCountryColumn( df: DataFrame, rawCol: String = "country_code", sourceSystemCol: String = "source_system", outCol: String = "country_code_std", reasonCol: String = "normalization_reason", rawOutCol: String = "raw_country_value", normalizedByCol: String = "normalized_by", ruleVersionCol: String = "rule_version" )(implicit spark: SparkSession): DataFrame = {
val (bcAlias, bcMaster) = buildBroadcastMaps(spark)
// UDFs for cleaning and alias key
val canonicalCleanUDF = udf { s: String => canonicalClean(s) }
val aliasKeyUDF = udf { s: String => aliasKey(s) }
// UDF to normalize to ISO-2 code or null
val normalizeUDF = udf { (raw: String) =>
val master = bcMaster.value
val alias = bcAlias.value
def isTwoLetter(code: String): Boolean =
code != null && code.matches("(?i)^[A-Z]{2}$")
if (raw == null) {
(null: String, "missing")
} else {
val cleaned = canonicalClean(raw)
if (cleaned == null || cleaned.isEmpty || cleaned.matches("^[\\p{Punct}\\s]+$")) {
(null: String, "missing")
} else {
// If already a two-letter code and valid
if (cleaned.matches("(?i)^[A-Za-z]{2}$")) {
val up = cleaned.toUpperCase
if (master.contains(up)) {
(up, "ok")
} else {
(null: String, "unknown_alias")
}
} else {
val key = aliasKey(cleaned)
val mapped = if (key != null) alias.get(key) else None
mapped match {
case Some(code) if code != null && code.matches("^[A-Z]{2}$") && bcMaster.value.contains(code) =>
(code, "ok")
case _ =>
(null: String, "unknown_alias")
}
}
}
}
})
val withRaw = df.withColumn(rawOutCol, col(rawCol))
val withClean = withRaw
.withColumn("country_cleaned", canonicalCleanUDF(col(rawCol)))
.withColumn("country_alias_key", aliasKeyUDF(col("country_cleaned")))
.withColumn("_norm_struct", normalizeUDF(col(rawCol)))
.withColumn(outCol, col("_norm_struct._1"))
.withColumn(reasonCol, col("_norm_struct._2"))
.drop("_norm_struct")
.withColumn(normalizedByCol, lit(NormalizedBy))
.withColumn(ruleVersionCol, lit(RuleVersion))
// Ensure uppercase output for safety
val withUpper = withClean.withColumn(outCol,
when(col(outCol).isNotNull, upper(col(outCol))).otherwise(col(outCol))
)
// Optional: forbid anything not [A-Z]{2}
val validated = withUpper.withColumn(outCol,
when(col(outCol).isNotNull && col(outCol).rlike("^[A-Z]{2}$"), col(outCol)).otherwise(lit(null:String))
)
// Attach source_system if not provided
val withSource =
if (validated.columns.contains(sourceSystemCol)) validated
else validated.withColumn(sourceSystemCol, lit(null:String))
withSource
}
// 将未知别名写入侧写/错误表(Delta/Hive) // Write unknown aliases to an error/side table (Delta/Hive) def writeUnknowns( normalizedDF: DataFrame, errorTable: String = "country_normalization_errors" ): Unit = { val unknowns = normalizedDF .filter(col("normalization_reason") =!= "ok") .select( current_timestamp().as("event_time"), col("raw_country_value"), col("country_cleaned"), col("country_alias_key"), col("normalization_reason"), col("source_system"), col("rule_version") ) unknowns.write.mode("append").format("delta").saveAsTable(errorTable) }
// 批处理示例 Batch example def runBatch( spark: SparkSession, inputTable: String = "customer_addresses", // 输入表需包含 country_code 字段 targetTable: String = "customer_addresses_silver", errorTable: String = "country_normalization_errors" ): Unit = { val input = spark.table(inputTable) val normalized = normalizeCountryColumn(input)
// 写入目标表(仅示例,可改为 merge/upsert)
normalized.write.mode("overwrite").format("delta").saveAsTable(targetTable)
// 写入未知别名
writeUnknowns(normalized, errorTable)
}
// 流式示例(以 Delta 源为例;Kafka 可将 value 解析为 DF 再套用 normalizeCountryColumn) // Streaming example (Delta source; for Kafka, parse value to DF first) def runStreaming( spark: SparkSession, sourcePath: String, // Bronze Delta path checkpointDir: String, targetTable: String = "customer_addresses_silver_stream", errorTable: String = "country_normalization_errors_stream" ): Unit = { import spark.implicits._
val streamDF = spark.readStream.format("delta").load(sourcePath)
val normalized = normalizeCountryColumn(streamDF)
val queryMain = normalized
.writeStream
.option("checkpointLocation", s"$checkpointDir/main")
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.mode("append").format("delta").saveAsTable(targetTable)
}
.start()
val unknowns = normalized
.filter(col("normalization_reason") =!= "ok")
.select(
current_timestamp().as("event_time"),
col("raw_country_value"),
col("country_cleaned"),
col("country_alias_key"),
col("normalization_reason"),
col("source_system"),
col("rule_version")
)
val queryErr = unknowns
.writeStream
.option("checkpointLocation", s"$checkpointDir/errors")
.outputMode("append")
.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.mode("append").format("delta").saveAsTable(errorTable)
}
.start()
queryMain.awaitTermination()
queryErr.awaitTermination()
}
// 简单使用示例 Simple usage example def example(spark: SparkSession): Unit = { import spark.implicits._ val data = Seq( (" united states ", "crm"), ("uk", "web"), ("中华人民共和国", "app"), ("Korea, South", "erp"), ("U.S.", "crm"), (null, "crm"), ("??", "web"), ("deutschland", "erp"), ("PRC", "app") ).toDF("country_code", "source_system")
val normalized = normalizeCountryColumn(data)
normalized.select("country_code", "country_code_std", "normalization_reason", "raw_country_value", "normalized_by", "rule_version", "source_system")
.show(false)
// 期望输出 Expected:
// ' united states ' -> 'US'
// 'uk' -> 'GB'
// '中华人民共和国' -> 'CN' (依赖字典 alias_key 收录中文别名;可在 iso_country_lookup 中加入 alias)
// 'Korea, South' -> 'KR'
// 'U.S.' -> 'US'
} }
// 备注 Notes: // - 为确保校验严格,推荐维护完整的 iso_country_master(~249 个 ISO-2 代码)。 // - iso_country_lookup 中 alias_key 必须用与代码相同的 canonical 规则生成(见 aliasKey)。 // - 若别名规模较小(数千级),广播 Map 性能良好;若更大,改用 join: // df.join(broadcast(lookup), Seq("country_alias_key"), "left") // - 在生产中建议以 MERGE 方式更新目标表,并维护数据变更审计日志。 // - 本实现不修改原始列,只新增标准化列与审计列,便于回溯。
通过一条可复用的高效提示词,快速为“指定表-指定列”生成清晰、可落地的数据规范化规则:1)把零散命名、格式不统一、异常值等,转化为可执行的清洗标准;2)让非技术成员也能与“内置数据工程师”协作,缩短数据整理周期;3)支持多语言输出,帮助全球团队统一口径;4)可迭代复用,沉淀企业级字段标准,提升报表与分析的准确度;5)以更低成本替代大规模手工清洗,推动数据产品、BI与增长项目更快上线。
为指定表列快速生成规范化规则,统一日期、数值、文本格式,缩短清洗与上线时间。
在数据接入前先约定字段标准,提升报表准确率,减少手工修正与重复校验。
建立可审阅的规则文档与示例,推动部门统一标准,降低合规风险和数据口径分歧。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
半价获取高级提示词-优惠即将到期