生成用于整合多个数据源的Python脚本,提供专业技术支持。
下面给出一个可运行的参考脚本,用于整合四类数据源(销售CRM、运营日志、客服工单、数据仓库),并输出标准化后的维表与事实表。脚本采用“抽取-标准化-实体解析-整合-输出”的流程,尽量保持对具体厂商/系统无关,便于按需对接实际环境。 说明与假设 - 依赖库:pandas、requests、SQLAlchemy、python-dateutil(pandas 自带解析也可)、pyarrow(可选,用于写 Parquet;若未安装将回退为 CSV)。 - 认证与连接信息通过环境变量配置,避免硬编码凭据。 - API 端点与字段命名存在差异,脚本提供字段映射与注释,便于按实际系统字段调整。 - 实体解析采用“确定性匹配优先”的策略:优先使用数据仓库中的客户主数据(若有);否则以域名(来自邮箱/公司网址)为锚点,生成稳定客户ID。该策略透明、可复现,便于审计与迭代。 - 输出结果: - dim_customer:客户维表(去重、稳定ID) - fact_sales:销售机会/订单事实(含客户ID) - fact_interactions:交互事实(运营日志与客服工单统一为“交互”) - 如果需要将结果回写数据仓库,可开启示例中的 to_sql 段落(以 SQLAlchemy URL 为准)。 使用方法 - 设置环境变量后直接运行:python integrate_sources.py - 核对并调整“字段映射”段落以与实际数据对齐。 - 在生产使用前,先在开发环境对小样本运行,校验覆盖率与正确性。 代码 ------------------------------------------------------------ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 整合销售CRM、运营日志、客服工单、数据仓库的数据,输出统一的客户维表和交互/销售事实表。 环境变量(示例): # CRM CRM_API_URL=https://your-crm.example.com/api CRM_API_TOKEN=xxxx # 客服工单 TICKET_API_URL=https://your-ticket.example.com/api TICKET_API_TOKEN=yyyy # 运营日志 OPS_LOG_PATH=./data/ops_logs/*.jsonl # 支持 glob;可为 csv/jsonl # 数据仓库(只读或读写) DW_SQLALCHEMY_URL=postgresql+psycopg2://user:pwd@host:5432/dbname # 控制增量 SINCE_DAYS=30 运行: python integrate_sources.py 输出: ./output/dim_customer.parquet (或 csv) ./output/fact_sales.parquet (或 csv) ./output/fact_interactions.parquet (或 csv) """ import os import re import json import glob import math import uuid import time import logging from datetime import datetime, timedelta, timezone from typing import Dict, List, Optional import pandas as pd import requests from sqlalchemy import create_engine, text # ------------------------- # 全局设置 # ------------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s" ) UTC = timezone.utc # ------------------------- # 配置读取 # ------------------------- CRM_API_URL = os.getenv("CRM_API_URL", "").rstrip("/") CRM_API_TOKEN = os.getenv("CRM_API_TOKEN", "") TICKET_API_URL = os.getenv("TICKET_API_URL", "").rstrip("/") TICKET_API_TOKEN = os.getenv("TICKET_API_TOKEN", "") OPS_LOG_PATH = os.getenv("OPS_LOG_PATH", "./data/ops_logs/*.jsonl") DW_SQLALCHEMY_URL = os.getenv("DW_SQLALCHEMY_URL", "") SINCE_DAYS = int(os.getenv("SINCE_DAYS", "30")) SINCE_DT = datetime.now(tz=UTC) - timedelta(days=SINCE_DAYS) OUTPUT_DIR = "./output" os.makedirs(OUTPUT_DIR, exist_ok=True) # ------------------------- # 实用函数:清洗/标准化 # ------------------------- def normalize_email(email: Optional[str]) -> Optional[str]: if not email or not isinstance(email, str): return None e = email.strip().lower() # 简单校验 if "@" not in e: return None return e def extract_domain_from_email(email: Optional[str]) -> Optional[str]: e = normalize_email(email) if not e: return None try: return e.split("@", 1)[1] except Exception: return None def normalize_domain(domain: Optional[str]) -> Optional[str]: if not domain or not isinstance(domain, str): return None d = domain.strip().lower() d = re.sub(r"^https?://", "", d) d = d.split("/")[0] # 去除常见前缀 for prefix in ["www.", "m."]: if d.startswith(prefix): d = d[len(prefix):] if "." not in d: return None return d def extract_domain_from_url(url: Optional[str]) -> Optional[str]: if not url or not isinstance(url, str): return None return normalize_domain(url) def coalesce_domain(email: Optional[str], website: Optional[str]) -> Optional[str]: d = extract_domain_from_email(email) if d: return d return extract_domain_from_url(website) def parse_ts_to_utc(ts) -> Optional[pd.Timestamp]: if ts is None or (isinstance(ts, float) and math.isnan(ts)): return None try: t = pd.to_datetime(ts, utc=True, errors="coerce") if pd.isna(t): return None return t.tz_convert("UTC") if t.tzinfo else t.tz_localize("UTC") except Exception: return None def stable_id_from_domain(domain: str) -> str: # 依据域名生成稳定 UUID(可复现) return uuid.uuid5(uuid.NAMESPACE_DNS, domain).hex def safe_to_parquet_or_csv(df: pd.DataFrame, path_base: str): try: df.to_parquet(f"{path_base}.parquet", index=False) logging.info(f"written: {path_base}.parquet") except Exception as e: logging.warning(f"fall back to CSV for {path_base}: {e}") df.to_csv(f"{path_base}.csv", index=False) logging.info(f"written: {path_base}.csv") # ------------------------- # 数据抽取:通用 API 工具 # ------------------------- def api_get_json(url: str, headers: Dict[str, str], params: Dict) -> dict: resp = requests.get(url, headers=headers, params=params, timeout=30) resp.raise_for_status() return resp.json() def fetch_paginated_list( base_url: str, endpoint: str, headers: Dict[str, str], params: Dict, page_param: str = "page", per_page_param: str = "per_page", per_page: int = 200, data_path: Optional[List[str]] = None, max_pages: int = 100 ) -> List[dict]: """ 通用分页拉取。根据你实际的 API 规则调整参数名/翻页逻辑/data_path。 data_path: 用于从返回的 JSON 中取列表的路径,如 ["data","items"]。 """ out = [] page = 1 while page <= max_pages: p = dict(params) p[page_param] = page p[per_page_param] = per_page data = api_get_json(f"{base_url}{endpoint}", headers, p) # 取数据列表 items = data if data_path: for k in data_path: items = items.get(k, []) if not isinstance(items, list): items = [] break elif isinstance(items, dict): # 若不清楚结构,可尝试常见键 for k in ["data", "items", "results"]: if k in items and isinstance(items[k], list): items = items[k] break if isinstance(items, dict): items = [] if not items: break out.extend(items) page += 1 # 简单限速,必要时增强 time.sleep(0.1) return out # ------------------------- # 数据抽取:CRM # ------------------------- def fetch_crm_contacts(since_iso: str) -> pd.DataFrame: if not CRM_API_URL or not CRM_API_TOKEN: logging.warning("CRM 配置缺失,跳过 contacts 抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {CRM_API_TOKEN}"} # 请按实际 API 调整 endpoint、入参、data_path items = fetch_paginated_list( base_url=CRM_API_URL, endpoint="/contacts", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:contact_id, contact_name, contact_email, account_name, account_website, updated_at field_map = { "id": "contact_id", "name": "contact_name", "email": "contact_email", "account.name": "account_name", "account.website": "account_website", "updated_at": "updated_at", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) # 只保留目标字段 keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df def fetch_crm_deals(since_iso: str) -> pd.DataFrame: if not CRM_API_URL or not CRM_API_TOKEN: logging.warning("CRM 配置缺失,跳过 deals 抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {CRM_API_TOKEN}"} items = fetch_paginated_list( base_url=CRM_API_URL, endpoint="/deals", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:deal_id, deal_name, amount, stage, close_date, owner, account_name, account_website, contact_email, updated_at field_map = { "id": "deal_id", "name": "deal_name", "amount": "amount", "stage": "stage", "close_date": "close_date", "owner.name": "owner", "account.name": "account_name", "account.website": "account_website", "primary_contact.email": "contact_email", "updated_at": "updated_at", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df # ------------------------- # 数据抽取:客服工单 # ------------------------- def fetch_tickets(since_iso: str) -> pd.DataFrame: if not TICKET_API_URL or not TICKET_API_TOKEN: logging.warning("Ticket 配置缺失,跳过工单抽取") return pd.DataFrame() headers = {"Authorization": f"Bearer {TICKET_API_TOKEN}"} items = fetch_paginated_list( base_url=TICKET_API_URL, endpoint="/tickets", headers=headers, params={"updated_since": since_iso}, data_path=None ) df = pd.json_normalize(items) # 字段映射:根据你实际系统字段调整 # 目标字段:ticket_id, subject, status, priority, created_at, updated_at, requester_email, assignee, organization, channel field_map = { "id": "ticket_id", "subject": "subject", "status": "status", "priority": "priority", "created_at": "created_at", "updated_at": "updated_at", "requester.email": "requester_email", "assignee.name": "assignee", "organization.name": "organization", "channel": "channel", } df = df.rename(columns={k: v for k, v in field_map.items() if k in df.columns}) keep = list(field_map.values()) df = df[[c for c in keep if c in df.columns]].copy() return df # ------------------------- # 数据抽取:运营日志(本地/对象存储)支持 CSV/JSONL # ------------------------- def load_ops_logs(path_glob: str) -> pd.DataFrame: paths = glob.glob(path_glob) if not paths: logging.warning(f"未找到运营日志文件:{path_glob}") return pd.DataFrame() frames = [] for p in paths: if p.lower().endswith(".csv"): tmp = pd.read_csv(p) elif p.lower().endswith(".jsonl") or p.lower().endswith(".ndjson"): tmp = pd.read_json(p, lines=True) elif p.lower().endswith(".json"): tmp = pd.read_json(p) else: logging.warning(f"不支持的文件类型,跳过:{p}") continue frames.append(tmp) if not frames: return pd.DataFrame() df = pd.concat(frames, ignore_index=True) # 字段映射:根据实际日志格式调整 # 目标字段:event_id, event_type, occurred_at, user_email, account_domain, metadata(json) # 尝试映射常见字段 candidate_map = { "id": "event_id", "event_id": "event_id", "type": "event_type", "event_type": "event_type", "timestamp": "occurred_at", "occurred_at": "occurred_at", "user.email": "user_email", "user_email": "user_email", "account.domain": "account_domain", "account_domain": "account_domain", "metadata": "metadata", "data": "metadata", } rename_map = {k: v for k, v in candidate_map.items() if k in df.columns} df = df.rename(columns=rename_map) keep = ["event_id", "event_type", "occurred_at", "user_email", "account_domain", "metadata"] df = df[[c for c in keep if c in df.columns]].copy() # metadata 标准化为 JSON 字符串,便于落地 if "metadata" in df.columns: df["metadata"] = df["metadata"].apply(lambda x: json.dumps(x, ensure_ascii=False) if isinstance(x, (dict, list)) else (x if isinstance(x, str) else None)) return df # ------------------------- # 数据抽取:数据仓库(读取客户主数据等) # ------------------------- def query_dw(sql: str) -> pd.DataFrame: if not DW_SQLALCHEMY_URL: logging.warning("DW_SQLALCHEMY_URL 未设置,跳过数据仓库查询") return pd.DataFrame() engine = create_engine(DW_SQLALCHEMY_URL) with engine.connect() as conn: df = pd.read_sql(text(sql), conn) return df def fetch_dw_customers() -> pd.DataFrame: # 请按实际客户主数据表调整查询 sql = """ SELECT customer_id, customer_name, domain, -- 建议在主数据中维护主域名 website, updated_at FROM dim_customer """ try: return query_dw(sql) except Exception as e: logging.warning(f"查询数据仓库客户主数据失败:{e}") return pd.DataFrame() # ------------------------- # 标准化与实体解析 # ------------------------- def standardize_crm_contacts(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df df["contact_email"] = df["contact_email"].apply(normalize_email) if "contact_email" in df.columns else None if "account_website" in df.columns: df["account_domain"] = df["account_website"].apply(extract_domain_from_url) else: df["account_domain"] = None if "updated_at" in df.columns: df["updated_at"] = df["updated_at"].apply(parse_ts_to_utc) return df def standardize_crm_deals(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "contact_email" in df.columns: df["contact_email"] = df["contact_email"].apply(normalize_email) # 优先从 contact_email 提取域名,回退到 account_website email_domain = df["contact_email"].apply(extract_domain_from_email) if "contact_email" in df.columns else pd.Series([None]*len(df)) website_domain = df["account_website"].apply(extract_domain_from_url) if "account_website" in df.columns else pd.Series([None]*len(df)) df["account_domain"] = email_domain.fillna(website_domain) if "close_date" in df.columns: df["close_date"] = df["close_date"].apply(parse_ts_to_utc) if "updated_at" in df.columns: df["updated_at"] = df["updated_at"].apply(parse_ts_to_utc) # 数值规范 if "amount" in df.columns: df["amount"] = pd.to_numeric(df["amount"], errors="coerce") return df def standardize_tickets(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "requester_email" in df.columns: df["requester_email"] = df["requester_email"].apply(normalize_email) df["requester_domain"] = df["requester_email"].apply(extract_domain_from_email) else: df["requester_domain"] = None for col in ["created_at", "updated_at"]: if col in df.columns: df[col] = df[col].apply(parse_ts_to_utc) return df def standardize_ops_logs(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return df if "user_email" in df.columns: df["user_email"] = df["user_email"].apply(normalize_email) df["user_domain"] = df["user_email"].apply(extract_domain_from_email) else: df["user_domain"] = None if "account_domain" in df.columns: df["account_domain"] = df["account_domain"].apply(normalize_domain) if "occurred_at" in df.columns: df["occurred_at"] = df["occurred_at"].apply(parse_ts_to_utc) return df def build_dim_customer(dw_customers: pd.DataFrame, crm_contacts: pd.DataFrame, crm_deals: pd.DataFrame, tickets: pd.DataFrame, ops_logs: pd.DataFrame) -> pd.DataFrame: """构建客户维表:优先使用DW主数据;补全来自CRM/Ticket/Logs的域名集合,生成稳定ID""" candidates = [] # DW 主数据(优先) if not dw_customers.empty: df = dw_customers.copy() if "domain" in df.columns: df["domain"] = df["domain"].apply(normalize_domain) else: df["domain"] = df["website"].apply(extract_domain_from_url) if "website" in df.columns else None df = df[["customer_id", "customer_name", "domain", "website", "updated_at"]].copy() candidates.append(df) # CRM 账户域(来自 contacts 和 deals) if not crm_contacts.empty: tmp = crm_contacts.copy() tmp["domain"] = tmp["account_domain"] tmp["customer_name"] = tmp.get("account_name") tmp["website"] = tmp.get("account_website") tmp = tmp[["customer_name", "domain", "website"]] candidates.append(tmp) if not crm_deals.empty: tmp = crm_deals.copy() tmp["domain"] = tmp["account_domain"] tmp["customer_name"] = tmp.get("account_name") tmp["website"] = tmp.get("account_website") tmp = tmp[["customer_name", "domain", "website"]] candidates.append(tmp) # Tickets 域名 if not tickets.empty: tmp = tickets.copy() tmp["domain"] = tmp["requester_domain"] tmp["customer_name"] = tmp.get("organization") tmp["website"] = None tmp = tmp[["customer_name", "domain", "website"]] candidates.append(tmp) # Logs 域名 if not ops_logs.empty: tmp = ops_logs.copy() # 优先使用 account_domain,其次 user_domain tmp["domain"] = tmp["account_domain"].fillna(tmp["user_domain"]) tmp["customer_name"] = None tmp["website"] = None tmp = tmp[["customer_name", "domain", "website"]] candidates.append(tmp) if not candidates: return pd.DataFrame(columns=["customer_id", "customer_name", "domain", "website", "source"]) all_cand = pd.concat(candidates, ignore_index=True) all_cand["domain"] = all_cand["domain"].apply(normalize_domain) all_cand = all_cand.dropna(subset=["domain"]).drop_duplicates(subset=["domain"]).reset_index(drop=True) # 合并 DW 主数据的 customer_id,若缺失则根据域名生成稳定 ID if not dw_customers.empty and "customer_id" in dw_customers.columns: dw_map = dw_customers.copy() dw_map["domain"] = dw_map["domain"].apply(normalize_domain) dw_map = dw_map.dropna(subset=["domain"]) dim = all_cand.merge(dw_map[["domain", "customer_id", "customer_name", "website"]], on="domain", how="left", suffixes=("", "_dw")) # 优先 DW 名称/网站 dim["customer_name"] = dim["customer_name_dw"].fillna(dim["customer_name"]) dim["website"] = dim["website_dw"].fillna(dim["website"]) dim = dim.drop(columns=[c for c in dim.columns if c.endswith("_dw")]) else: dim = all_cand.copy() dim["customer_id"] = None # 生成缺失 ID mask = dim["customer_id"].isna() dim.loc[mask, "customer_id"] = dim.loc[mask, "domain"].apply(stable_id_from_domain) # 规范列 dim["customer_name"] = dim["customer_name"].fillna(dim["domain"]) dim["website"] = dim["website"].apply(lambda x: x if isinstance(x, str) else None) dim = dim[["customer_id", "customer_name", "domain", "website"]].copy() return dim def map_customer_id(df: pd.DataFrame, dim_customer: pd.DataFrame, domain_col: str, out_col: str = "customer_id") -> pd.DataFrame: if df.empty: df[out_col] = None return df tmp = df.copy() tmp = tmp.merge(dim_customer[["domain", "customer_id"]], left_on=domain_col, right_on="domain", how="left") tmp = tmp.drop(columns=["domain_y"], errors="ignore").rename(columns={"domain_x": domain_col}) # 对无域名匹配的记录,保持 customer_id 为缺失 return tmp # ------------------------- # 事实表构建 # ------------------------- def build_fact_sales(crm_deals: pd.DataFrame, dim_customer: pd.DataFrame) -> pd.DataFrame: if crm_deals.empty: return pd.DataFrame(columns=["deal_id", "deal_name", "amount", "stage", "close_date", "owner", "customer_id", "account_domain"]) df = crm_deals.copy() df = map_customer_id(df, dim_customer, domain_col="account_domain", out_col="customer_id") cols = ["deal_id", "deal_name", "amount", "stage", "close_date", "owner", "customer_id", "account_domain"] df = df[[c for c in cols if c in df.columns]] return df def build_fact_interactions(tickets: pd.DataFrame, ops_logs: pd.DataFrame, dim_customer: pd.DataFrame) -> pd.DataFrame: frames = [] # 工单 -> 交互 if not tickets.empty: t = tickets.copy() t["interaction_type"] = "ticket" t["occurred_at"] = t["created_at"] if "created_at" in t.columns else None t["actor"] = t["requester_email"] if "requester_email" in t.columns else None t["domain"] = t["requester_domain"] t["payload"] = t.apply(lambda r: json.dumps({ "ticket_id": r.get("ticket_id"), "subject": r.get("subject"), "status": r.get("status"), "priority": r.get("priority"), "assignee": r.get("assignee"), "channel": r.get("channel"), }, ensure_ascii=False), axis=1) t = map_customer_id(t, dim_customer, domain_col="domain", out_col="customer_id") frames.append(t[["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"]]) # 运营日志 -> 交互 if not ops_logs.empty: o = ops_logs.copy() o["interaction_type"] = o.get("event_type", "log") o["actor"] = o.get("user_email") # 优先 account_domain 其后 user_domain dom = o.get("account_domain") if dom is None: dom = o.get("user_domain") o["domain"] = dom # payload 保留 metadata o["payload"] = o.get("metadata") o = map_customer_id(o, dim_customer, domain_col="domain", out_col="customer_id") # 统一 occurred_at 列名 if "occurred_at" not in o.columns: o["occurred_at"] = None frames.append(o[["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"]]) if not frames: return pd.DataFrame(columns=["interaction_type", "occurred_at", "actor", "customer_id", "domain", "payload"]) inter = pd.concat(frames, ignore_index=True) return inter # ------------------------- # 主流程 # ------------------------- def main(): since_iso = SINCE_DT.isoformat() logging.info(f"增量抽取起始时间:{since_iso}") # 抽取 crm_contacts = fetch_crm_contacts(since_iso) crm_deals = fetch_crm_deals(since_iso) tickets = fetch_tickets(since_iso) ops_logs = load_ops_logs(OPS_LOG_PATH) dw_customers = fetch_dw_customers() logging.info(f"CRM contacts: {len(crm_contacts)}, CRM deals: {len(crm_deals)}, tickets: {len(tickets)}, logs: {len(ops_logs)}, dw_customers: {len(dw_customers)}") # 标准化 crm_contacts = standardize_crm_contacts(crm_contacts) crm_deals = standardize_crm_deals(crm_deals) tickets = standardize_tickets(tickets) ops_logs = standardize_ops_logs(ops_logs) # 实体解析(客户维表) dim_customer = build_dim_customer(dw_customers, crm_contacts, crm_deals, tickets, ops_logs) logging.info(f"dim_customer 行数:{len(dim_customer)}") # 事实表 fact_sales = build_fact_sales(crm_deals, dim_customer) fact_interactions = build_fact_interactions(tickets, ops_logs, dim_customer) logging.info(f"fact_sales 行数:{len(fact_sales)}, fact_interactions 行数:{len(fact_interactions)}") # 输出 safe_to_parquet_or_csv(dim_customer, os.path.join(OUTPUT_DIR, "dim_customer")) safe_to_parquet_or_csv(fact_sales, os.path.join(OUTPUT_DIR, "fact_sales")) safe_to_parquet_or_csv(fact_interactions, os.path.join(OUTPUT_DIR, "fact_interactions")) # 如需将结果回写数据仓库,取消以下注释并确保 DW_SQLALCHEMY_URL 可写,且目标表结构匹配或允许 if_exists="replace"/"append" """ if DW_SQLALCHEMY_URL: engine = create_engine(DW_SQLALCHEMY_URL) with engine.begin() as conn: dim_customer.to_sql("dim_customer_unified", conn, if_exists="replace", index=False) fact_sales.to_sql("fact_sales_unified", conn, if_exists="replace", index=False) fact_interactions.to_sql("fact_interactions_unified", conn, if_exists="replace", index=False) logging.info("已将统一表写回数据仓库") """ logging.info("整合流程完成") if __name__ == "__main__": main() ------------------------------------------------------------ 实施与校验建议 - 字段映射对齐:根据你的CRM、客服系统与日志字段,调整字段映射段落(fetch_crm_contacts/fetch_crm_deals/fetch_tickets/load_ops_logs 中的 field_map)。 - 分页与限流:实际API的分页参数名与返回结构可能不同,调整 fetch_paginated_list 的参数与 data_path。 - 实体解析覆盖率评估:输出 fact_sales 与 fact_interactions 中 customer_id 的缺失比例,并回看无法匹配的域名,优化数据质量(例如补齐网站、规范邮箱、维护域名别名映射)。 - 时间统一:确保所有时间戳统一为 UTC,避免跨系统对齐误差。 - 数据敏感性:输出时谨慎处理 PII;如需脱敏或字段白名单,请在构建 payload 时定制。
Below is a self-contained Python script that integrates three sources—instrumented tracking events, survey results, and user feedback—into clean, analysis-ready datasets. It focuses on robust preprocessing, schema normalization, timestamp unification (to UTC), deduplication, safe handling of potential PII via hashing, and time-aware joins (nearest “backward” match within a configurable window). It produces two outputs: an event-level integrated table and a user-level feature table. Key assumptions - The three inputs can be CSV, JSON (records list or JSONL), or Parquet files. - Each source includes a user identifier and a timestamp column (customizable via a config). - Naive timestamps are localized using a configurable default timezone (e.g., Asia/Shanghai), then converted to UTC. - PII columns (if present) can be hashed using a salt; raw PII is not preserved. - Survey scoring is left as-is if not numeric; you can provide score columns in the config. - The nearest-join attaches the nearest prior survey/feedback to each event within a configurable window (e.g., 7 days). How to run - Save as integrate_data.py - Example: python integrate_data.py --events path/to/events.csv --surveys path/to/surveys.jsonl --feedback path/to/feedback.parquet --outdir ./out --window-days 7 --default-tz "Asia/Shanghai" --salt "your_salt" --config path/to/column_mapping.json - If you don’t provide a config, sensible defaults are attempted (see DEFAULT_CONFIG in the script). Script: integrate_data.py #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Integrate tracking events, survey results, and user feedback into: 1) Event-level table with nearest (backward) survey/feedback enrichment. 2) User-level feature table with aggregates. Dependencies: - Python 3.9+ - pandas>=1.5 - numpy - pyarrow (optional; for Parquet output) Notes: - Timestamps are standardized to UTC. - PII columns can be hashed with SHA-256 and a provided salt. - The script supports CSV, JSON/JSONL, and Parquet inputs. """ import argparse import hashlib import json import logging import re from pathlib import Path from typing import Dict, List, Optional, Tuple import numpy as np import pandas as pd # ----------------------- # Default column mappings # ----------------------- DEFAULT_CONFIG = { "events": { # Rename your source columns to these canonical names "user_id": "user_id", "timestamp": "event_time", "event_name": "event_name", # event_properties can be a JSON string or dict with extra fields "event_properties": "event_properties" }, "surveys": { "user_id": "user_id", "timestamp": "submitted_at", # Provide numeric score columns if available (e.g., Likert or 0-10) "score_cols": [], # e.g., ["q1_score", "q2_score"] "nps_col": None # e.g., "nps_score" }, "feedback": { "user_id": "user_id", "timestamp": "created_at", "text_col": "feedback_text", "rating_col": None # e.g., "star_rating" }, "pii": { # Any PII columns present in any source will be hashed with SHA-256 + salt "emails": ["email", "user_email"], "phones": ["phone", "mobile"], "other": [] # add any other PII columns if needed } } # ----------------------- # Logging # ----------------------- logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s" ) logger = logging.getLogger("integrator") # ----------------------- # IO helpers # ----------------------- def load_table(path: Path) -> pd.DataFrame: """Load a table from CSV, JSON/JSONL, or Parquet.""" suffix = path.suffix.lower() if suffix in [".csv"]: return pd.read_csv(path) if suffix in [".parquet", ".pq"]: return pd.read_parquet(path) if suffix in [".json", ".jsonl"]: # Try JSON Lines first try: return pd.read_json(path, lines=True) except Exception: # Fallback to standard JSON array with open(path, "r", encoding="utf-8") as f: data = json.load(f) if isinstance(data, list): return pd.json_normalize(data) elif isinstance(data, dict): # Try to find a list under a top-level key for k, v in data.items(): if isinstance(v, list): return pd.json_normalize(v) # As a last resort, make single-row frame return pd.json_normalize(data) raise ValueError(f"Unsupported file format for: {path}") def write_output(df: pd.DataFrame, out_path: Path) -> None: """Write DataFrame to Parquet if available, else CSV.""" out_path.parent.mkdir(parents=True, exist_ok=True) try: df.to_parquet(out_path.with_suffix(".parquet"), index=False) logger.info(f"Wrote {out_path.with_suffix('.parquet')}") except Exception as e: logger.warning(f"Parquet write failed ({e}); falling back to CSV.") df.to_csv(out_path.with_suffix(".csv"), index=False) logger.info(f"Wrote {out_path.with_suffix('.csv')}") # ----------------------- # Preprocessing helpers # ----------------------- def enforce_str_id(df: pd.DataFrame, col: str) -> pd.DataFrame: if col not in df.columns: raise KeyError(f"Missing required id column: {col}") df[col] = df[col].astype("string") return df def to_utc(df: pd.DataFrame, ts_col: str, default_tz: str) -> pd.Series: if ts_col not in df.columns: raise KeyError(f"Missing required timestamp column: {ts_col}") s = pd.to_datetime(df[ts_col], errors="coerce", utc=False) # Check if tz-aware tz_aware = hasattr(s.dtype, "tz") and s.dtype.tz is not None if not tz_aware: # Localize naive timestamps then convert to UTC try: s = s.dt.tz_localize(default_tz, nonexistent="shift_forward", ambiguous="NaT") except TypeError: # Older pandas may not support nonexistent/ambiguous kwargs s = s.dt.tz_localize(default_tz) s = s.dt.tz_convert("UTC") return s def parse_event_properties(df: pd.DataFrame, prop_col: str) -> pd.DataFrame: if prop_col not in df.columns: return df # If stringified JSON, try parsing and expanding def _parse(x): if isinstance(x, dict): return x if isinstance(x, str): x = x.strip() if x.startswith("{") and x.endswith("}"): try: return json.loads(x) except Exception: return None return None props = df[prop_col].map(_parse) if props.notna().any(): props_df = pd.json_normalize(props) props_df.columns = [f"prop_{c}" for c in props_df.columns] df = pd.concat([df.drop(columns=[prop_col]), props_df], axis=1) return df def hash_series(s: pd.Series, salt: str) -> pd.Series: def _hash(x): if pd.isna(x): return pd.NA m = hashlib.sha256() m.update(salt.encode("utf-8")) m.update(str(x).encode("utf-8")) return m.hexdigest() return s.astype("string").map(_hash) def hash_pii(df: pd.DataFrame, pii_cfg: Dict, salt: str) -> pd.DataFrame: if not salt: if any(col in df.columns for col in (pii_cfg.get("emails", []) + pii_cfg.get("phones", []) + pii_cfg.get("other", []))): logger.warning("No salt provided; PII columns will remain unhashed.") return df for col in pii_cfg.get("emails", []) + pii_cfg.get("phones", []) + pii_cfg.get("other", []): if col in df.columns: df[col] = hash_series(df[col], salt) return df def clean_text(s: pd.Series) -> pd.Series: # Simple normalization useful for feedback text url_re = re.compile(r"https?://\S+|www\.\S+") s = s.astype("string") s = s.str.replace(url_re, "", regex=True) s = s.str.replace(r"\s+", " ", regex=True).str.strip() return s def deduplicate_events(df: pd.DataFrame, user_col: str, ts_col: str, name_col: str) -> pd.DataFrame: subset = [c for c in [user_col, ts_col, name_col] if c in df.columns] if not subset: return df df = df.drop_duplicates(subset=subset, keep="first") return df # ----------------------- # Integration helpers # ----------------------- def prepare_events(events: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["events"] events = events.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "event_time"): "event_time", remap.get("event_name", "event_name"): "event_name", remap.get("event_properties", "event_properties"): "event_properties" }) events = enforce_str_id(events, "user_id") events["event_ts"] = to_utc(events, "event_time", default_tz) events = parse_event_properties(events, "event_properties") events["event_name"] = events.get("event_name", pd.Series(index=events.index, dtype="string")).astype("string") events = hash_pii(events, cfg.get("pii", {}), salt) events = deduplicate_events(events, "user_id", "event_ts", "event_name") # Optimize dtypes events["event_name"] = events["event_name"].astype("category") events = events.drop(columns=[c for c in ["event_time"] if c in events.columns]) return events def prepare_surveys(surveys: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["surveys"] surveys = surveys.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "submitted_at"): "survey_time" }) surveys = enforce_str_id(surveys, "user_id") surveys["survey_ts"] = to_utc(surveys, "survey_time", default_tz) surveys = hash_pii(surveys, cfg.get("pii", {}), salt) # Convert specified score columns to numeric if present score_cols = remap.get("score_cols", []) or [] for col in score_cols: if col in surveys.columns: surveys[f"score_{col}"] = pd.to_numeric(surveys[col], errors="coerce") nps_col = remap.get("nps_col") if nps_col and nps_col in surveys.columns: surveys["nps_score"] = pd.to_numeric(surveys[nps_col], errors="coerce") # Optionally derive NPS category def _nps_bucket(x): if pd.isna(x): return pd.NA x = float(x) if x >= 9: return "promoter" if x >= 7: return "passive" return "detractor" surveys["nps_bucket"] = surveys["nps_score"].map(_nps_bucket).astype("category") surveys = surveys.drop(columns=[c for c in ["survey_time"] if c in surveys.columns]) return surveys def prepare_feedback(feedback: pd.DataFrame, cfg: Dict, default_tz: str, salt: str) -> pd.DataFrame: remap = cfg["feedback"] feedback = feedback.rename(columns={ remap.get("user_id", "user_id"): "user_id", remap.get("timestamp", "created_at"): "feedback_time", remap.get("text_col", "feedback_text"): "feedback_text", remap.get("rating_col", "rating"): "feedback_rating" }) feedback = enforce_str_id(feedback, "user_id") feedback["feedback_ts"] = to_utc(feedback, "feedback_time", default_tz) if "feedback_text" in feedback.columns: feedback["feedback_text"] = clean_text(feedback["feedback_text"]) feedback["feedback_char_len"] = feedback["feedback_text"].str.len() feedback["feedback_word_count"] = feedback["feedback_text"].str.split().map(lambda x: len(x) if isinstance(x, list) else pd.NA) if "feedback_rating" in feedback.columns: feedback["feedback_rating"] = pd.to_numeric(feedback["feedback_rating"], errors="coerce") feedback = hash_pii(feedback, cfg.get("pii", {}), salt) feedback = feedback.drop(columns=[c for c in ["feedback_time"] if c in feedback.columns]) return feedback def nearest_backward_join( left: pd.DataFrame, right: pd.DataFrame, left_ts: str, right_ts: str, by: str, tolerance: pd.Timedelta, suffix: str ) -> pd.DataFrame: if right.empty: # No-op if right side is empty for c in right.columns: if c not in [by, right_ts]: left[f"{c}{suffix}"] = pd.NA return left left = left.sort_values([by, left_ts]) right = right.sort_values([by, right_ts]) # Ensure same dtypes for 'by' left[by] = left[by].astype("string") right[by] = right[by].astype("string") merged = pd.merge_asof( left, right, left_on=left_ts, right_on=right_ts, by=by, direction="backward", tolerance=tolerance, suffixes=("", suffix) ) # Add suffix to non-key cols from right to avoid collisions (merge_asof uses suffixes only for overlapping names) # Already handled by suffixes parameter for collisions; keep merged as-is. return merged def build_user_features( events: pd.DataFrame, enriched_events: pd.DataFrame, surveys_cfg: Dict ) -> pd.DataFrame: # User-level aggregates from event stream g = events.groupby("user_id", observed=True) user_feat = pd.DataFrame({ "user_id": g.size().rename("total_events") }).reset_index() # Active days if "event_ts" in events.columns: user_days = events.assign(day=events["event_ts"].dt.date).groupby("user_id", observed=True)["day"].nunique().rename("active_days") user_feat = user_feat.merge(user_days, on="user_id", how="left") first_last = g["event_ts"].agg(["min", "max"]).rename(columns={"min": "first_event_ts", "max": "last_event_ts"}).reset_index() user_feat = user_feat.merge(first_last, on="user_id", how="left") # Survey aggregates if present on enriched events score_cols = surveys_cfg.get("score_cols", []) or [] for col in score_cols: colname = f"score_{col}" if colname in enriched_events.columns: agg = enriched_events.groupby("user_id", observed=True)[colname].mean().rename(f"avg_{colname}") user_feat = user_feat.merge(agg, on="user_id", how="left") if "nps_score" in enriched_events.columns: nps_agg = enriched_events.groupby("user_id", observed=True)["nps_score"].mean().rename("avg_nps_score") user_feat = user_feat.merge(nps_agg, on="user_id", how="left") if "nps_bucket" in enriched_events.columns: # distribution by bucket nps_counts = pd.crosstab(enriched_events["user_id"], enriched_events["nps_bucket"]).add_prefix("nps_") user_feat = user_feat.merge(nps_counts.reset_index(), on="user_id", how="left") # Feedback aggregates (if text/rating exist) if "feedback_rating" in enriched_events.columns: fb_rating = enriched_events.groupby("user_id", observed=True)["feedback_rating"].mean().rename("avg_feedback_rating") user_feat = user_feat.merge(fb_rating, on="user_id", how="left") if "feedback_char_len" in enriched_events.columns: fb_len = enriched_events.groupby("user_id", observed=True)["feedback_char_len"].mean().rename("avg_feedback_char_len") user_feat = user_feat.merge(fb_len, on="user_id", how="left") return user_feat # ----------------------- # Main # ----------------------- def main(): parser = argparse.ArgumentParser(description="Integrate events, surveys, and feedback.") parser.add_argument("--events", required=True, type=Path, help="Path to events file (csv/json/jsonl/parquet)") parser.add_argument("--surveys", required=True, type=Path, help="Path to surveys file (csv/json/jsonl/parquet)") parser.add_argument("--feedback", required=True, type=Path, help="Path to feedback file (csv/json/jsonl/parquet)") parser.add_argument("--outdir", required=True, type=Path, help="Output directory") parser.add_argument("--config", type=Path, default=None, help="Path to JSON config for column mapping") parser.add_argument("--window-days", type=float, default=7.0, help="Backward join window in days for survey/feedback") parser.add_argument("--default-tz", type=str, default="UTC", help="Timezone to localize naive timestamps (e.g., 'Asia/Shanghai')") parser.add_argument("--salt", type=str, default="", help="Salt for hashing PII columns. If empty, PII not hashed.") args = parser.parse_args() # Load config if args.config and args.config.exists(): with open(args.config, "r", encoding="utf-8") as f: cfg = json.load(f) # Merge with defaults so missing keys fall back def deep_update(base, upd): for k, v in upd.items(): if isinstance(v, dict) and isinstance(base.get(k), dict): deep_update(base[k], v) else: base[k] = v cfg_full = json.loads(json.dumps(DEFAULT_CONFIG)) # deep copy deep_update(cfg_full, cfg) cfg = cfg_full else: cfg = DEFAULT_CONFIG # Load sources logger.info("Loading input tables...") events = load_table(args.events) surveys = load_table(args.surveys) feedback = load_table(args.feedback) logger.info("Preprocessing events...") events = prepare_events(events, cfg, args.default_tz, args.salt) logger.info(f"Events shape after prep: {events.shape}") logger.info("Preprocessing surveys...") surveys = prepare_surveys(surveys, cfg, args.default_tz, args.salt) logger.info(f"Surveys shape after prep: {surveys.shape}") logger.info("Preprocessing feedback...") feedback = prepare_feedback(feedback, cfg, args.default_tz, args.salt) logger.info(f"Feedback shape after prep: {feedback.shape}") # Nearest backward joins on user_id within tolerance tolerance = pd.Timedelta(days=args.window_days) logger.info(f"Joining surveys to events with backward tolerance {tolerance}...") events_enriched = nearest_backward_join( left=events, right=surveys, left_ts="event_ts", right_ts="survey_ts", by="user_id", tolerance=tolerance, suffix="_survey" ) logger.info(f"Joining feedback to events with backward tolerance {tolerance}...") events_enriched = nearest_backward_join( left=events_enriched, right=feedback, left_ts="event_ts", right_ts="feedback_ts", by="user_id", tolerance=tolerance, suffix="_feedback" ) # Output 1: event-level integrated dataset out_events = args.outdir / "integrated_events" write_output(events_enriched, out_events) # Output 2: user-level features logger.info("Building user-level features...") user_features = build_user_features(events, events_enriched, cfg.get("surveys", {})) out_users = args.outdir / "user_features" write_output(user_features, out_users) logger.info("Done.") if __name__ == "__main__": main() Notes and guidance - Column mapping: If your source column names differ, provide a config JSON to map them. Example config: { "events": { "user_id": "uid", "timestamp": "ts", "event_name": "name", "event_properties": "props" }, "surveys": { "user_id": "uid", "timestamp": "submitted_at", "score_cols": ["q1_score", "q2_score"], "nps_col": "nps" }, "feedback": { "user_id": "uid", "timestamp": "created_at", "text_col": "comment", "rating_col": "stars" }, "pii": { "emails": ["email"], "phones": ["phone"], "other": ["full_name"] } } - Timezone handling: For naive timestamps, set --default-tz to the timezone your data was recorded in (e.g., Asia/Shanghai). The script converts everything to UTC. - Join behavior: Each event row will include the nearest prior survey and feedback (if within the tolerance window). Adjust --window-days to match your analysis horizon. - PII handling: Provide a strong salt via --salt to hash any PII columns listed in the config. If omitted, PII columns will remain unhashed. - Scalability: For very large datasets, consider chunked reads and writing, or migrating to a distributed framework. The logic here is organized so you can port it to Spark with minimal conceptual changes (e.g., use window functions for nearest joins).
快速生成整合脚本,把销售、运营、客服等多源数据统一口径,沉淀稳定中间层,提升供数效率并减少返工。
合并投放、渠道与站内行为数据,自动去重和归因,迅速形成转化漏斗与ROI底表,缩短评估与复盘周期。
打通埋点、问卷与用户反馈,清洗异常与失效事件,输出留存、分群与功能使用分析所需的数据集。
整合库存、订单、物流与退货信息,生成日常监控底表,支持缺货预警、活动复盘与供应计划制定。
对接多机构与公共信息数据,统一特征与时间口径,加速评分卡训练、批量评估与策略迭代。
合并多来源开放数据并标准化指标,快速构建可复现实验数据集,提升论文与研究报告的数据可靠性。
用一条高质量的提示词,驱动 AI 以“数据挖掘专家”的方式工作,在几分钟内生成可直接运行、可复用、可交接的 Python 数据整合脚本。它能够将多来源数据(表格、数据库、线上服务、日志、云存储等)一站式合并,自动完成字段映射、去重、缺失值处理与时间对齐,并给出清晰的操作说明与质量校验清单。目标是显著减少手工拼接和沟通成本,降低错误率,缩短报表/看板产出周期,并沉淀可复用的整合模板体系,帮助团队从试用快速走向上线与规模化复用。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期