热门角色不仅是灵感来源,更是你的效率助手。通过精挑细选的角色提示词,你可以快速生成高质量内容、提升创作灵感,并找到最契合你需求的解决方案。让创作更轻松,让价值更直接!
我们根据不同用户需求,持续更新角色库,让你总能找到合适的灵感入口。
生成用于整合多个数据源的Python脚本,提供专业技术支持。
下面给出一个可运行的参考脚本,用于整合四类数据源(销售CRM、运营日志、客服工单、数据仓库),并输出标准化后的维表与事实表。脚本采用“抽取-标准化-实体解析-整合-输出”的流程,尽量保持对具体厂商/系统无关,便于按需对接实际环境。
说明与假设
使用方法
#!/usr/bin/env python3
""" 整合销售CRM、运营日志、客服工单、数据仓库的数据,输出统一的客户维表和交互/销售事实表。
环境变量(示例):
CRM_API_URL=https://your-crm.example.com/api CRM_API_TOKEN=xxxx
TICKET_API_URL=https://your-ticket.example.com/api TICKET_API_TOKEN=yyyy
OPS_LOG_PATH=./data/ops_logs/*.jsonl # 支持 glob;可为 csv/jsonl
DW_SQLALCHEMY_URL=postgresql+psycopg2://user:pwd@host:5432/dbname
SINCE_DAYS=30
运行: python integrate_sources.py
输出: ./output/dim_customer.parquet (或 csv) ./output/fact_sales.parquet (或 csv) ./output/fact_interactions.parquet (或 csv) """
import os import re import json import glob import math import uuid import time import logging from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional
import pandas as pd import requests from sqlalchemy import create_engine, text
logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" )
UTC = timezone.utc
CRM_API_URL = os.getenv("CRM_API_URL", "").rstrip("/") CRM_API_TOKEN = os.getenv("CRM_API_TOKEN", "")
TICKET_API_URL = os.getenv("TICKET_API_URL", "").rstrip("/") TICKET_API_TOKEN = os.getenv("TICKET_API_TOKEN", "")
OPS_LOG_PATH = os.getenv("OPS_LOG_PATH", "./data/ops_logs/*.jsonl") DW_SQLALCHEMY_URL = os.getenv("DW_SQLALCHEMY_URL", "")
SINCE_DAYS = int(os.getenv("SINCE_DAYS", "30")) SINCE_DT = datetime.now(tz=UTC) - timedelta(days=SINCE_DAYS)
OUTPUT_DIR = "./output" os.makedirs(OUTPUT_DIR, exist_ok=True)
def normalize_email(email: Optional[str]) -> Optional[str]: if not email or not isinstance(email, str): return None e = email.strip().lower() # 简单校验 if "@" not in e: return None return e
def extract_domain_from_email(email: Optional[str]) -> Optional[str]: e = normalize_email(email) if not e: return None try: return e.split("@", 1)[1] except Exception: return None
def normalize_domain(domain: Optional[str]) -> Optional[str]: if not domain or not isinstance(domain, str): return None d = domain.strip().lower() d = re.sub(r"^https?://", "", d) d = d.split("/")[0] # 去除常见前缀 for prefix in ["www.", "m."]: if d.startswith(prefix): d = d[len(prefix):] if "." not in d: return None return d
def extract_domain_from_url(url: Optional[str]) -> Optional[str]: if not url or not isinstance(url, str): return None return normalize_domain(url)
def coalesce_domain(email: Optional[str], website: Optional[str]) -> Optional[str]: d = extract_domain_from_email(email) if d: return d return extract_domain_from_url(website)
def parse_ts_to_utc(ts) -> Optional[pd.Timestamp]: if ts is None or (isinstance(ts, float) and math.isnan(ts)): return None try: t = pd.to_datetime(ts, utc=True, errors="coerce") if pd.isna(t): return None return t.tz_convert("UTC") if t.tzinfo else t.tz_localize("UTC") except Exception: return None
def stable_id_from_domain(domain: str) -> str: # 依据域名生成稳定 UUID(可复现) return uuid.uuid5(uuid.NAMESPACE_DNS, domain).hex
def safe_to_parquet_or_csv(df: pd.DataFrame, path_base: str): try: df.to_parquet(f"{path_base}.parquet", index=False) logging.info(f"written: {path_base}.parquet") except Exception as e: logging.warning(f"fall back to CSV for {path_base}: {e}") df.to_csv(f"{path_base}.csv", index=False) logging.info(f"written: {path_base}.csv")
def api_get_json(url: str, headers: Dict[str, str], params: Dict) -> dict: resp = requests.get(url, headers=headers, params=params, timeout=30) resp.raise_for_status() return resp.json()
def fetch_paginated_list( base_url: str, endpoint: str, headers: Dict[str, str], params: Dict, page_param: str = "page", per_page_param: str = "per_page", per_page: int = 200, data_path: Optional[List[str]] = None, max_pages: int = 100 ) -> List[dict]: """ 通用分页拉取。根据你实际的 API 规则调整参数名/翻页逻辑/data_path。 data_path: 用于从返回的 JSON 中取列表的路径,如 ["data","items"]。 """ out = [] page = 1 while page <= max_pages: p = dict(params) p[page_param] = page p[per_page_param] = per_page data = api_get_json(f"{base_url}{endpoint}", headers, p) # 取数据列表 items = data if data_path: for k in data_path: items = items.get(k, []) if not isinstance(items, list): items = [] break elif isinstance(items, dict): # 若不清楚结构,可尝试常见键 for k in ["data", "items", "results"]: if k in items and isinstance(items[k], list): items = items[k] break if isinstance(items, dict): items = [] if not items: break out.extend(items) page += 1 # 简单限速,必要时增强 time.sleep(0.1) return out
def fetch_crm_contacts(since_iso: str) -> pd.DataFrame: if not CRM_API_URL or not CRM_API_TOKEN: logging.warning("CRM 配置缺失,跳过 contacts 抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {CRM_API_TOKEN}"} # 请按实际 API 调整 endpoint、入参、data_path items = fetch_paginated_list( base_url=CRM_API_URL, endpoint="/contacts", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:contact_id, contact_name, contact_email, account_name, account_website, updated_at field_map = { "id": "contact_id", "name": "contact_name", "email": "contact_email", "account.name": "account_name", "account.website": "account_website", "updated_at": "updated_at", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) # 只保留目标字段 keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df
def fetch_crm_deals(since_iso: str) -> pd.DataFrame: if not CRM_API_URL or not CRM_API_TOKEN: logging.warning("CRM 配置缺失,跳过 deals 抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {CRM_API_TOKEN}"} items = fetch_paginated_list( base_url=CRM_API_URL, endpoint="/deals", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:deal_id, deal_name, amount, stage, close_date, owner, account_name, account_website, contact_email, updated_at field_map = { "id": "deal_id", "name": "deal_name", "amount": "amount", "stage": "stage", "close_date": "close_date", "owner.name": "owner", "account.name": "account_name", "account.website": "account_website", "primary_contact.email": "contact_email", "updated_at": "updated_at", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df
def fetch_tickets(since_iso: str) -> pd.DataFrame: if not TICKET_API_URL or not TICKET_API_TOKEN: logging.warning("Ticket 配置缺失,跳过工单抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {TICKET_API_TOKEN}"} items = fetch_paginated_list( base_url=TICKET_API_URL, endpoint="/tickets", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:ticket_id, subject, status, priority, created_at, updated_at, requester_email, assignee, organization, channel field_map = { "id": "ticket_id", "subject": "subject", "status": "status", "priority": "priority", "created_at": "created_at", "updated_at": "updated_at", "requester.email": "requester_email", "assignee.name": "assignee", "organization.name": "organization", "channel": "channel", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df
def load_ops_logs(path_glob: str) -> pd.DataFrame: paths = glob.glob(path_glob) if not paths: logging.warning(f"未找到运营日志文件:{path_glob}") return pd.DataFrame() frames = [] for p in paths: if p.lower().endswith(".csv"): tmp = pd.read_csv(p) elif p.lower().endswith(".jsonl") or p.lower().endswith(".ndjson"): tmp = pd.read_json(p, lines=True) elif p.lower().endswith(".json"): tmp = pd.read_json(p) else: logging.warning(f"不支持的文件类型,跳过:{p}") continue frames.append(tmp) if not frames: return pd.DataFrame() df = pd.concat(frames, ignore_index=True) # 字段映射:根据实际日志格式调整 # 目标字段:event_id, event_type, occurred_at, user_email, account_domain, metadata(json) # 尝试映射常见字段 candidate_map = { "id": "event_id", "event_id": "event_id", "type": "event_type", "event_type": "event_type", "timestamp": "occurred_at", "occurred_at": "occurred_at", "user.email": "user_email", "user_email": "user_email", "account.domain": "account_domain", "account_domain": "account_domain", "metadata": "metadata", "data": "metadata", } rename_map = {k: v for k, v in candidate_map.items() if k in df.columns} df = df.rename(columns=rename_map) keep = ["event_id", "event_type", "occurred_at", "user_email", "account_domain", "metadata"] df = df[[c for c in keep if c in df.columns]].copy() # metadata 标准化为 JSON 字符串,便于落地 if "metadata" in df.columns: df["metadata"] = df["metadata"].apply(lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if isinstance(x, str) else None)) return df
def query_dw(sql: str) -> pd.DataFrame: if not DW_SQLALCHEMY_URL: logging.warning("DW_SQLALCHEMY_URL 未设置,跳过数据仓库查询") return pd.DataFrame() engine = create_engine(DW_SQLALCHEMY_URL) with engine.connect() as conn: df = pd.read_sql(text(sql), conn) return df
def fetch_dw_customers() -> pd.DataFrame: # 请按实际客户主数据表调整查询 sql = """ SELECT customer_id, customer_name, domain, -- 建议在主数据中维护主域名 website, updated_at FROM dim_customer """ try: return query_dw(sql) except Exception as e: logging.warning(f"查询数据仓库客户主数据失败:{e}") return pd.DataFrame()
def standardize_crm_contacts(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df df["contact_email"] = df["contact_email"].apply(normalize_email) if "contact_email" in df.columns else None if "account_website" in df.columns: df["account_domain"] = df["account_website"].apply(extract_domain_from_url) else: df["account_domain"] = None if "updated_at" in df.columns: df["updated_at"] = df["updated_at"].apply(parse_ts_to_utc) return df
def standardize_crm_deals(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "contact_email" in df.columns: df["contact_email"] = df["contact_email"].apply(normalize_email) # 优先从 contact_email 提取域名,回退到 account_website email_domain = df["contact_email"].apply(extract_domain_from_email) if "contact_email" in df.columns else pd.Series([None]*len(df)) website_domain = df["account_website"].apply(extract_domain_from_url) if "account_website" in df.columns else pd.Series([None]*len(df)) df["account_domain"] = email_domain.fillna(website_domain) if "close_date" in df.columns: df["close_date"] = df["close_date"].apply(parse_ts_to_utc) if "updated_at" in df.columns: df["updated_at"] = df["updated_at"].apply(parse_ts_to_utc) # 数值规范 if "amount" in df.columns: df["amount"] = pd.to_numeric(df["amount"], errors="coerce") return df
def standardize_tickets(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "requester_email" in df.columns: df["requester_email"] = df["requester_email"].apply(normalize_email) df["requester_domain"] = df["requester_email"].apply(extract_domain_from_email) else: df["requester_domain"] = None for col in ["created_at", "updated_at"]: if col in df.columns: df[col] = df[col].apply(parse_ts_to_utc) return df
def standardize_ops_logs(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "user_email" in df.columns: df["user_email"] = df["user_email"].apply(normalize_email) df["user_domain"] = df["user_email"].apply(extract_domain_from_email) else: df["user_domain"] = None if "account_domain" in df.columns: df["account_domain"] = df["account_domain"].apply(normalize_domain) if "occurred_at" in df.columns: df["occurred_at"] = df["occurred_at"].apply(parse_ts_to_utc) return df
def build_dim_customer(dw_customers: pd.DataFrame, crm_contacts: pd.DataFrame, crm_deals: pd.DataFrame, tickets: pd.DataFrame, ops_logs: pd.DataFrame) -> pd.DataFrame: """构建客户维表:优先使用DW主数据;补全来自CRM/Ticket/Logs的域名集合,生成稳定ID""" candidates = []
# DW 主数据(优先)
if not dw_customers.empty:
df = dw_customers.copy()
if "domain" in df.columns:
df["domain"] = df["domain"].apply(normalize_domain)
else:
df["domain"] = df["website"].apply(extract_domain_from_url) if "website" in df.columns else None
df = df[["customer_id", "customer_name", "domain", "website", "updated_at"]].copy()
candidates.append(df)
# CRM 账户域(来自 contacts 和 deals)
if not crm_contacts.empty:
tmp = crm_contacts.copy()
tmp["domain"] = tmp["account_domain"]
tmp["customer_name"] = tmp.get("account_name")
tmp["website"] = tmp.get("account_website")
tmp = tmp[["customer_name", "domain", "website"]]
candidates.append(tmp)
if not crm_deals.empty:
tmp = crm_deals.copy()
tmp["domain"] = tmp["account_domain"]
tmp["customer_name"] = tmp.get("account_name")
tmp["website"] = tmp.get("account_website")
tmp = tmp[["customer_name", "domain", "website"]]
candidates.append(tmp)
# Tickets 域名
if not tickets.empty:
tmp = tickets.copy()
tmp["domain"] = tmp["requester_domain"]
tmp["customer_name"] = tmp.get("organization")
tmp["website"] = None
tmp = tmp[["customer_name", "domain", "website"]]
candidates.append(tmp)
# Logs 域名
if not ops_logs.empty:
tmp = ops_logs.copy()
# 优先使用 account_domain,其次 user_domain
tmp["domain"] = tmp["account_domain"].fillna(tmp["user_domain"])
tmp["customer_name"] = None
tmp["website"] = None
tmp = tmp[["customer_name", "domain", "website"]]
candidates.append(tmp)
if not candidates:
return pd.DataFrame(columns=["customer_id", "customer_name", "domain", "website", "source"])
all_cand = pd.concat(candidates, ignore_index=True)
all_cand["domain"] = all_cand["domain"].apply(normalize_domain)
all_cand = all_cand.dropna(subset=["domain"]).drop_duplicates(subset=["domain"]).reset_index(drop=True)
# 合并 DW 主数据的 customer_id,若缺失则根据域名生成稳定 ID
if not dw_customers.empty and "customer_id" in dw_customers.columns:
dw_map = dw_customers.copy()
dw_map["domain"] = dw_map["domain"].apply(normalize_domain)
dw_map = dw_map.dropna(subset=["domain"])
dim = all_cand.merge(dw_map[["domain", "customer_id", "customer_name", "website"]], on="domain", how="left", suffixes=("", "_dw"))
# 优先 DW 名称/网站
dim["customer_name"] = dim["customer_name_dw"].fillna(dim["customer_name"])
dim["website"] = dim["website_dw"].fillna(dim["website"])
dim = dim.drop(columns=[c for c in dim.columns if c.endswith("_dw")])
else:
dim = all_cand.copy()
dim["customer_id"] = None
# 生成缺失 ID
mask = dim["customer_id"].isna()
dim.loc[mask, "customer_id"] = dim.loc[mask, "domain"].apply(stable_id_from_domain)
# 规范列
dim["customer_name"] = dim["customer_name"].fillna(dim["domain"])
dim["website"] = dim["website"].apply(lambda x: x if isinstance(x, str) else None)
dim = dim[["customer_id", "customer_name", "domain", "website"]].copy()
return dim
def map_customer_id(df: pd.DataFrame, dim_customer: pd.DataFrame, domain_col: str, out_col: str = "customer_id") -> pd.DataFrame: if df.empty: df[out_col] = None return df tmp = df.copy() tmp = tmp.merge(dim_customer[["domain", "customer_id"]], left_on=domain_col, right_on="domain", how="left") tmp = tmp.drop(columns=["domain_y"], errors="ignore").rename(columns={"domain_x": domain_col}) # 对无域名匹配的记录,保持 customer_id 为缺失 return tmp
def build_fact_sales(crm_deals: pd.DataFrame, dim_customer: pd.DataFrame) -> pd.DataFrame: if crm_deals.empty: return pd.DataFrame(columns=["deal_id", "deal_name", "amount", "stage", "close_date", "owner", "customer_id", "account_domain"]) df = crm_deals.copy() df = map_customer_id(df, dim_customer, domain_col="account_domain", out_col="customer_id") cols = ["deal_id", "deal_name", "amount", "stage", "close_date", "owner", "customer_id", "account_domain"] df = df[[c for c in cols if c in df.columns]] return df
def build_fact_interactions(tickets: pd.DataFrame, ops_logs: pd.DataFrame, dim_customer: pd.DataFrame) -> pd.DataFrame: frames = []
# 工单 -> 交互
if not tickets.empty:
t = tickets.copy()
t["interaction_type"] = "ticket"
t["occurred_at"] = t["created_at"] if "created_at" in t.columns else None
t["actor"] = t["requester_email"] if "requester_email" in t.columns else None
t["domain"] = t["requester_domain"]
t["payload"] = t.apply(lambda r: json.dumps({
"ticket_id": r.get("ticket_id"),
"subject": r.get("subject"),
"status": r.get("status"),
"priority": r.get("priority"),
"assignee": r.get("assignee"),
"channel": r.get("channel"),
}, ensure_ascii=False), axis=1)
t = map_customer_id(t, dim_customer, domain_col="domain", out_col="customer_id")
frames.append(t[["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"]])
# 运营日志 -> 交互
if not ops_logs.empty:
o = ops_logs.copy()
o["interaction_type"] = o.get("event_type", "log")
o["actor"] = o.get("user_email")
# 优先 account_domain 其后 user_domain
dom = o.get("account_domain")
if dom is None:
dom = o.get("user_domain")
o["domain"] = dom
# payload 保留 metadata
o["payload"] = o.get("metadata")
o = map_customer_id(o, dim_customer, domain_col="domain", out_col="customer_id")
# 统一 occurred_at 列名
if "occurred_at" not in o.columns:
o["occurred_at"] = None
frames.append(o[["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"]])
if not frames:
return pd.DataFrame(columns=["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"])
inter = pd.concat(frames, ignore_index=True)
return inter
def main(): since_iso = SINCE_DT.isoformat() logging.info(f"增量抽取起始时间:{since_iso}")
# 抽取
crm_contacts = fetch_crm_contacts(since_iso)
crm_deals = fetch_crm_deals(since_iso)
tickets = fetch_tickets(since_iso)
ops_logs = load_ops_logs(OPS_LOG_PATH)
dw_customers = fetch_dw_customers()
logging.info(f"CRM contacts: {len(crm_contacts)}, CRM deals: {len(crm_deals)}, tickets: {len(tickets)}, logs: {len(ops_logs)}, dw_customers: {len(dw_customers)}")
# 标准化
crm_contacts = standardize_crm_contacts(crm_contacts)
crm_deals = standardize_crm_deals(crm_deals)
tickets = standardize_tickets(tickets)
ops_logs = standardize_ops_logs(ops_logs)
# 实体解析(客户维表)
dim_customer = build_dim_customer(dw_customers, crm_contacts, crm_deals, tickets, ops_logs)
logging.info(f"dim_customer 行数:{len(dim_customer)}")
# 事实表
fact_sales = build_fact_sales(crm_deals, dim_customer)
fact_interactions = build_fact_interactions(tickets, ops_logs, dim_customer)
logging.info(f"fact_sales 行数:{len(fact_sales)}, fact_interactions 行数:{len(fact_interactions)}")
# 输出
safe_to_parquet_or_csv(dim_customer, os.path.join(OUTPUT_DIR, "dim_customer"))
safe_to_parquet_or_csv(fact_sales, os.path.join(OUTPUT_DIR, "fact_sales"))
safe_to_parquet_or_csv(fact_interactions, os.path.join(OUTPUT_DIR, "fact_interactions"))
# 如需将结果回写数据仓库,取消以下注释并确保 DW_SQLALCHEMY_URL 可写,且目标表结构匹配或允许 if_exists="replace"/"append"
"""
if DW_SQLALCHEMY_URL:
engine = create_engine(DW_SQLALCHEMY_URL)
with engine.begin() as conn:
dim_customer.to_sql("dim_customer_unified", conn, if_exists="replace", index=False)
fact_sales.to_sql("fact_sales_unified", conn, if_exists="replace", index=False)
fact_interactions.to_sql("fact_interactions_unified", conn, if_exists="replace", index=False)
logging.info("已将统一表写回数据仓库")
"""
logging.info("整合流程完成")
实施与校验建议
Below is a self-contained Python script that integrates three sources—instrumented tracking events, survey results, and user feedback—into clean, analysis-ready datasets. It focuses on robust preprocessing, schema normalization, timestamp unification (to UTC), deduplication, safe handling of potential PII via hashing, and time-aware joins (nearest “backward” match within a configurable window). It produces two outputs: an event-level integrated table and a user-level feature table.
Key assumptions
How to run
Save as integrate_data.py
Example: python integrate_data.py --events path/to/events.csv --surveys path/to/surveys.jsonl --feedback path/to/feedback.parquet --outdir ./out --window-days 7 --default-tz "Asia/Shanghai" --salt "your_salt" --config path/to/column_mapping.json
If you don’t provide a config, sensible defaults are attempted (see DEFAULT_CONFIG in the script).
Script: integrate_data.py
#!/usr/bin/env python3
""" Integrate tracking events, survey results, and user feedback into:
Dependencies:
Notes:
import argparse import hashlib import json import logging import re from pathlib import Path from typing import Dict, List, Optional, Tuple
import numpy as np import pandas as pd
DEFAULT_CONFIG = { "events": { # Rename your source columns to these canonical names "user_id": "user_id", "timestamp": "event_time", "event_name": "event_name", # event_properties can be a JSON string or dict with extra fields "event_properties": "event_properties" }, "surveys": { "user_id": "user_id", "timestamp": "submitted_at", # Provide numeric score columns if available (e.g., Likert or 0-10) "score_cols": [], # e.g., ["q1_score", "q2_score"] "nps_col": None # e.g., "nps_score" }, "feedback": { "user_id": "user_id", "timestamp": "created_at", "text_col": "feedback_text", "rating_col": None # e.g., "star_rating" }, "pii": { # Any PII columns present in any source will be hashed with SHA-256 + salt "emails": ["email", "user_email"], "phones": ["phone", "mobile"], "other": [] # add any other PII columns if needed } }
logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" ) logger = logging.getLogger("integrator")
def load_table(path: Path) -> pd.DataFrame: """Load a table from CSV, JSON/JSONL, or Parquet.""" suffix = path.suffix.lower() if suffix in [".csv"]: return pd.read_csv(path) if suffix in [".parquet", ".pq"]: return pd.read_parquet(path) if suffix in [".json", ".jsonl"]: # Try JSON Lines first try: return pd.read_json(path, lines=True) except Exception: # Fallback to standard JSON array with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return pd.json_normalize(data) elif isinstance(data, dict): # Try to find a list under a top-level key for k, v in data.items(): if isinstance(v, list): return pd.json_normalize(v) # As a last resort, make single-row frame return pd.json_normalize(data) raise ValueError(f"Unsupported file format for: {path}")
def write_output(df: pd.DataFrame, out_path: Path) -> None: """Write DataFrame to Parquet if available, else CSV.""" out_path.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(out_path.with_suffix(".parquet"), index=False) logger.info(f"Wrote {out_path.with_suffix('.parquet')}") except Exception as e: logger.warning(f"Parquet write failed ({e}); falling back to CSV.") df.to_csv(out_path.with_suffix(".csv"), index=False) logger.info(f"Wrote {out_path.with_suffix('.csv')}")
def enforce_str_id(df: pd.DataFrame, col: str) -> pd.DataFrame: if col not in df.columns: raise KeyError(f"Missing required id column: {col}") df[col] = df[col].astype("string") return df
def to_utc(df: pd.DataFrame, ts_col: str, default_tz: str) -> pd.Series: if ts_col not in df.columns: raise KeyError(f"Missing required timestamp column: {ts_col}") s = pd.to_datetime(df[ts_col], errors="coerce", utc=False) # Check if tz-aware tz_aware = hasattr(s.dtype, "tz") and s.dtype.tz is not None if not tz_aware: # Localize naive timestamps then convert to UTC try: s = s.dt.tz_localize(default_tz, nonexistent="shift_forward", ambiguous="NaT") except TypeError: # Older pandas may not support nonexistent/ambiguous kwargs s = s.dt.tz_localize(default_tz) s = s.dt.tz_convert("UTC") return s
def parse_event_properties(df: pd.DataFrame, prop_col: str) -> pd.DataFrame: if prop_col not in df.columns: return df # If stringified JSON, try parsing and expanding def _parse(x): if isinstance(x, dict): return x if isinstance(x, str): x = x.strip() if x.startswith("{") and x.endswith("}"): try: return json.loads(x) except Exception: return None return None
props = df[prop_col].map(_parse)
if props.notna().any():
props_df = pd.json_normalize(props)
props_df.columns = [f"prop_{c}" for c in props_df.columns]
df = pd.concat([df.drop(columns=[prop_col]), props_df], axis=1)
return df
def hash_series(s: pd.Series, salt: str) -> pd.Series: def _hash(x): if pd.isna(x): return pd.NA m = hashlib.sha256() m.update(salt.encode("utf-8")) m.update(str(x).encode("utf-8")) return m.hexdigest() return s.astype("string").map(_hash)
def hash_pii(df: pd.DataFrame, pii_cfg: Dict, salt: str) -> pd.DataFrame: if not salt: if any(col in df.columns for col in (pii_cfg.get("emails", []) + pii_cfg.get("phones", []) + pii_cfg.get("other", []))): logger.warning("No salt provided; PII columns will remain unhashed.") return df for col in pii_cfg.get("emails", []) + pii_cfg.get("phones", []) + pii_cfg.get("other", []): if col in df.columns: df[col] = hash_series(df[col], salt) return df
def clean_text(s: pd.Series) -> pd.Series: # Simple normalization useful for feedback text url_re = re.compile(r"https?://\S+|www.\S+") s = s.astype("string") s = s.str.replace(url_re, "", regex=True) s = s.str.replace(r"\s+", " ", regex=True).str.strip() return s
def deduplicate_events(df: pd.DataFrame, user_col: str, ts_col: str, name_col: str) -> pd.DataFrame: subset = [c for c in [user_col, ts_col, name_col] if c in df.columns] if not subset: return df df = df.drop_duplicates(subset=subset, keep="first") return df
def prepare_events(events: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["events"] events = events.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "event_time"): "event_time", remap.get("event_name", "event_name"): "event_name", remap.get("event_properties", "event_properties"): "event_properties" }) events = enforce_str_id(events, "user_id") events["event_ts"] = to_utc(events, "event_time", default_tz) events = parse_event_properties(events, "event_properties") events["event_name"] = events.get("event_name", pd.Series(index=events.index, dtype="string")).astype("string") events = hash_pii(events, cfg.get("pii", {}), salt) events = deduplicate_events(events, "user_id", "event_ts", "event_name") # Optimize dtypes events["event_name"] = events["event_name"].astype("category") events = events.drop(columns=[c for c in ["event_time"] if c in events.columns]) return events
def prepare_surveys(surveys: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["surveys"] surveys = surveys.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "submitted_at"): "survey_time" }) surveys = enforce_str_id(surveys, "user_id") surveys["survey_ts"] = to_utc(surveys, "survey_time", default_tz) surveys = hash_pii(surveys, cfg.get("pii", {}), salt) # Convert specified score columns to numeric if present score_cols = remap.get("score_cols", []) or [] for col in score_cols: if col in surveys.columns: surveys[f"score_{col}"] = pd.to_numeric(surveys[col], errors="coerce") nps_col = remap.get("nps_col") if nps_col and nps_col in surveys.columns: surveys["nps_score"] = pd.to_numeric(surveys[nps_col], errors="coerce") # Optionally derive NPS category def _nps_bucket(x): if pd.isna(x): return pd.NA x = float(x) if x >= 9: return "promoter" if x >= 7: return "passive" return "detractor" surveys["nps_bucket"] = surveys["nps_score"].map(_nps_bucket).astype("category") surveys = surveys.drop(columns=[c for c in ["survey_time"] if c in surveys.columns]) return surveys
def prepare_feedback(feedback: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["feedback"] feedback = feedback.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "created_at"): "feedback_time", remap.get("text_col", "feedback_text"): "feedback_text", remap.get("rating_col", "rating"): "feedback_rating" }) feedback = enforce_str_id(feedback, "user_id") feedback["feedback_ts"] = to_utc(feedback, "feedback_time", default_tz) if "feedback_text" in feedback.columns: feedback["feedback_text"] = clean_text(feedback["feedback_text"]) feedback["feedback_char_len"] = feedback["feedback_text"].str.len() feedback["feedback_word_count"] = feedback["feedback_text"].str.split().map(lambda x: len(x) if isinstance(x, list) else pd.NA) if "feedback_rating" in feedback.columns: feedback["feedback_rating"] = pd.to_numeric(feedback["feedback_rating"], errors="coerce") feedback = hash_pii(feedback, cfg.get("pii", {}), salt) feedback = feedback.drop(columns=[c for c in ["feedback_time"] if c in feedback.columns]) return feedback
def nearest_backward_join( left: pd.DataFrame, right: pd.DataFrame, left_ts: str, right_ts: str, by: str, tolerance: pd.Timedelta, suffix: str ) -> pd.DataFrame: if right.empty: # No-op if right side is empty for c in right.columns: if c not in [by, right_ts]: left[f"{c}{suffix}"] = pd.NA return left left = left.sort_values([by, left_ts]) right = right.sort_values([by, right_ts]) # Ensure same dtypes for 'by' left[by] = left[by].astype("string") right[by] = right[by].astype("string") merged = pd.merge_asof( left, right, left_on=left_ts, right_on=right_ts, by=by, direction="backward", tolerance=tolerance, suffixes=("", suffix) ) # Add suffix to non-key cols from right to avoid collisions (merge_asof uses suffixes only for overlapping names) # Already handled by suffixes parameter for collisions; keep merged as-is. return merged
def build_user_features( events: pd.DataFrame, enriched_events: pd.DataFrame, surveys_cfg: Dict ) -> pd.DataFrame: # User-level aggregates from event stream g = events.groupby("user_id", observed=True) user_feat = pd.DataFrame({ "user_id": g.size().rename("total_events") }).reset_index() # Active days if "event_ts" in events.columns: user_days = events.assign(day=events["event_ts"].dt.date).groupby("user_id", observed=True)["day"].nunique().rename("active_days") user_feat = user_feat.merge(user_days, on="user_id", how="left") first_last = g["event_ts"].agg(["min", "max"]).rename(columns={"min": "first_event_ts", "max": "last_event_ts"}).reset_index() user_feat = user_feat.merge(first_last, on="user_id", how="left") # Survey aggregates if present on enriched events score_cols = surveys_cfg.get("score_cols", []) or [] for col in score_cols: colname = f"score_{col}" if colname in enriched_events.columns: agg = enriched_events.groupby("user_id", observed=True)[colname].mean().rename(f"avg_{colname}") user_feat = user_feat.merge(agg, on="user_id", how="left") if "nps_score" in enriched_events.columns: nps_agg = enriched_events.groupby("user_id", observed=True)["nps_score"].mean().rename("avg_nps_score") user_feat = user_feat.merge(nps_agg, on="user_id", how="left") if "nps_bucket" in enriched_events.columns: # distribution by bucket nps_counts = pd.crosstab(enriched_events["user_id"], enriched_events["nps_bucket"]).add_prefix("nps_") user_feat = user_feat.merge(nps_counts.reset_index(), on="user_id", how="left") # Feedback aggregates (if text/rating exist) if "feedback_rating" in enriched_events.columns: fb_rating = enriched_events.groupby("user_id", observed=True)["feedback_rating"].mean().rename("avg_feedback_rating") user_feat = user_feat.merge(fb_rating, on="user_id", how="left") if "feedback_char_len" in enriched_events.columns: fb_len = enriched_events.groupby("user_id", observed=True)["feedback_char_len"].mean().rename("avg_feedback_char_len") user_feat = user_feat.merge(fb_len, on="user_id", how="left") return user_feat
def main(): parser = argparse.ArgumentParser(description="Integrate events, surveys, and feedback.") parser.add_argument("--events", required=True, type=Path, help="Path to events file (csv/json/jsonl/parquet)") parser.add_argument("--surveys", required=True, type=Path, help="Path to surveys file (csv/json/jsonl/parquet)") parser.add_argument("--feedback", required=True, type=Path, help="Path to feedback file (csv/json/jsonl/parquet)") parser.add_argument("--outdir", required=True, type=Path, help="Output directory") parser.add_argument("--config", type=Path, default=None, help="Path to JSON config for column mapping") parser.add_argument("--window-days", type=float, default=7.0, help="Backward join window in days for survey/feedback") parser.add_argument("--default-tz", type=str, default="UTC", help="Timezone to localize naive timestamps (e.g., 'Asia/Shanghai')") parser.add_argument("--salt", type=str, default="", help="Salt for hashing PII columns. If empty, PII not hashed.") args = parser.parse_args()
# Load config
if args.config and args.config.exists():
with open(args.config, "r", encoding="utf-8") as f:
cfg = json.load(f)
# Merge with defaults so missing keys fall back
def deep_update(base, upd):
for k, v in upd.items():
if isinstance(v, dict) and isinstance(base.get(k), dict):
deep_update(base[k], v)
else:
base[k] = v
cfg_full = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy
deep_update(cfg_full, cfg)
cfg = cfg_full
else:
cfg = DEFAULT_CONFIG
# Load sources
logger.info("Loading input tables...")
events = load_table(args.events)
surveys = load_table(args.surveys)
feedback = load_table(args.feedback)
logger.info("Preprocessing events...")
events = prepare_events(events, cfg, args.default_tz, args.salt)
logger.info(f"Events shape after prep: {events.shape}")
logger.info("Preprocessing surveys...")
surveys = prepare_surveys(surveys, cfg, args.default_tz, args.salt)
logger.info(f"Surveys shape after prep: {surveys.shape}")
logger.info("Preprocessing feedback...")
feedback = prepare_feedback(feedback, cfg, args.default_tz, args.salt)
logger.info(f"Feedback shape after prep: {feedback.shape}")
# Nearest backward joins on user_id within tolerance
tolerance = pd.Timedelta(days=args.window_days)
logger.info(f"Joining surveys to events with backward tolerance {tolerance}...")
events_enriched = nearest_backward_join(
left=events,
right=surveys,
left_ts="event_ts",
right_ts="survey_ts",
by="user_id",
tolerance=tolerance,
suffix="_survey"
)
logger.info(f"Joining feedback to events with backward tolerance {tolerance}...")
events_enriched = nearest_backward_join(
left=events_enriched,
right=feedback,
left_ts="event_ts",
right_ts="feedback_ts",
by="user_id",
tolerance=tolerance,
suffix="_feedback"
)
# Output 1: event-level integrated dataset
out_events = args.outdir / "integrated_events"
write_output(events_enriched, out_events)
# Output 2: user-level features
logger.info("Building user-level features...")
user_features = build_user_features(events, events_enriched, cfg.get("surveys", {}))
out_users = args.outdir / "user_features"
write_output(user_features, out_users)
logger.info("Done.")
if name == "main": main()
Notes and guidance
{ "events": { "user_id": "uid", "timestamp": "ts", "event_name": "name", "event_properties": "props" }, "surveys": { "user_id": "uid", "timestamp": "submitted_at", "score_cols": ["q1_score", "q2_score"], "nps_col": "nps" }, "feedback": { "user_id": "uid", "timestamp": "created_at", "text_col": "comment", "rating_col": "stars" }, "pii": { "emails": ["email"], "phones": ["phone"], "other": ["full_name"] } }
用一条高质量的提示词,驱动 AI 以“数据挖掘专家”的方式工作,在几分钟内生成可直接运行、可复用、可交接的 Python 数据整合脚本。它能够将多来源数据(表格、数据库、线上服务、日志、云存储等)一站式合并,自动完成字段映射、去重、缺失值处理与时间对齐,并给出清晰的操作说明与质量校验清单。目标是显著减少手工拼接和沟通成本,降低错误率,缩短报表/看板产出周期,并沉淀可复用的整合模板体系,帮助团队从试用快速走向上线与规模化复用。
快速生成整合脚本,把销售、运营、客服等多源数据统一口径,沉淀稳定中间层,提升供数效率并减少返工。
合并投放、渠道与站内行为数据,自动去重和归因,迅速形成转化漏斗与ROI底表,缩短评估与复盘周期。
打通埋点、问卷与用户反馈,清洗异常与失效事件,输出留存、分群与功能使用分析所需的数据集。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期