从指定系统开始描述表的数据血缘关系,适合技术场景。
以下为从电商交易系统到数仓表 sales_orders 的数据血缘关系描述。内容覆盖源系统、采集、落地、规范化、汇总建模、字段级血缘、更新策略与数据质量控制,采用订单级事实表粒度(每条记录对应一个订单)。 一、范围与粒度 - 目标表:dw.gold.sales_orders(订单级事实表) - 粒度:一行代表一个订单(order_id)。明细行(order_items)在中间层聚合。 - 时间:基于订单创建时间(order_created_at)进行分区与快照。更新采用增量合并(CDC)。 二、上游源系统(OLTP) - 典型表: - oltp.orders(订单头):id, order_no, customer_id, store_id, channel, currency, created_at, updated_at, status, shipping_fee, tax_summary - oltp.order_items(订单明细):order_id, item_id, sku_id, quantity, unit_price, item_discount - oltp.order_discounts(优惠):order_id, discount_type(coupon/promotion),discount_amount - oltp.payments(支付):order_id, payment_id, method, authorized_amount, captured_amount, refunded_amount, currency - oltp.shipments(履约/发货):order_id, shipment_id, shipped_quantity, shipping_status, shipping_fee_override - oltp.taxes(税明细,可选):order_id, tax_type, tax_amount - 变更捕获:插入、更新、删除(取消/作废)通过CDC流出。 三、数据采集层(CDC/Streaming) - 流程: - CDC(如基于事务日志的变更捕获)将 oltp.* 的 DML 变更推送至消息流(如 Kafka/Kinesis)。 - 事件类型:insert/update/delete(tombstone);包含变更前后值与事务时间。 - 元数据: - source_system, table_name, op_type, op_ts, tx_id, schema_version。 - 目的: - 保留原始事件,支持可回放与幂等重放。 四、原始落地层(Bronze/Raw) - 存储:data_lake.raw.oltp_orders, raw.oltp_order_items, raw.oltp_payments 等(分区按 ingestion_date/region)。 - 特性: - 原样存储,保留源字段、数据类型与时区。 - 不做业务修正,仅做基础清洗(去重基于 tx_id 与主键组合)。 五、规范化层(Silver/Curated) - 表与处理: - silver.orders:对 raw.oltp_orders 去重、统一时区(转换为UTC)、标准化状态枚举、校正 shipping_fee、税字段展开。 - silver.order_items:清洗单价与数量,剔除负数或异常;保留 item-level 折扣。 - silver.order_discounts:汇总订单级优惠(订单头层面的券、活动)。 - silver.payments:归集每订单的授权/扣款/退款;明确支付状态。 - silver.shipments:履约状态与发货费用修正。 - 维表映射: - dim_customer(SCD2):通过自然键 customer_id 映射 surrogate_key customer_sk。 - dim_store、dim_channel、dim_product(对订单级保留 store/channel,明细层 product 仅在需要时做派生统计)。 - dim_fx_rates:按订单日期与货币取当日生效汇率。 - 合并策略: - 基于主键 + 事件时间进行 MERGE,保留最新有效版本;删除事件生成软删除标记。 六、汇总层(Gold/Model)sales_orders 构建 - 目标表:dw.gold.sales_orders - 主键:order_id(与源系统一一对应);业务主键备用:order_no + store_id。 - 维度外键:customer_sk, store_sk, channel_sk, currency_code。 - 核心度量字段(订单级): - gross_items_amount:∑(unit_price * quantity)(来自 silver.order_items) - item_discount_amount:∑(item_discount) - order_discount_amount:∑(silver.order_discounts.discount_amount) - tax_amount:来自 silver.taxes 或 silver.orders.tax_summary - shipping_fee:优先使用 silver.shipments.shipping_fee_override,其次 silver.orders.shipping_fee - refund_amount:∑(silver.payments.refunded_amount) - net_amount:gross_items_amount - item_discount_amount - order_discount_amount + tax_amount + shipping_fee - refund_amount - payment_captured_amount:∑(silver.payments.captured_amount) - payment_status:paid / partially_paid / unpaid(基于 payment_captured_amount 与 net_amount 比较) - fulfillment_status:shipped / partially_shipped / unshipped(基于 silver.shipments) - status:订单生命周期状态(created/paid/fulfilled/cancelled/returned),基于 silver.orders.status 与事件序列归一化 - amount_usd:net_amount * fx_rate_to_usd(来自 dim_fx_rates,按订单日期与 currency_code) - 时间字段: - order_created_at_utc,order_updated_at_utc - first_paid_at_utc,first_shipped_at_utc(如可得) - load_ts(管道加载时间),record_version(幂等版本) - 审计/来源字段: - source_system,source_pk,op_ts,schema_version,is_deleted(软删除) 七、字段级血缘映射(核心字段) - order_id ← oltp.orders.id - order_no ← oltp.orders.order_no - customer_sk ← dim_customer.surrogate_key 映射自 oltp.orders.customer_id - store_sk ← dim_store.surrogate_key 映射自 oltp.orders.store_id - channel_sk ← dim_channel.surrogate_key 映射自 oltp.orders.channel - currency_code ← oltp.orders.currency - order_created_at_utc ← 转换(oltp.orders.created_at, 源时区→UTC) - order_updated_at_utc ← 转换(oltp.orders.updated_at, 源时区→UTC) - gross_items_amount ← 聚合(silver.order_items.unit_price * silver.order_items.quantity by order_id) - item_discount_amount ← 聚合(silver.order_items.item_discount by order_id) - order_discount_amount ← 聚合(silver.order_discounts.discount_amount by order_id) - tax_amount ← 优先 silver.taxes.sum(tax_amount);备选 silver.orders.tax_summary.total - shipping_fee ← COALESCE(silver.shipments.shipping_fee_override, silver.orders.shipping_fee, 0) - refund_amount ← 聚合(silver.payments.refunded_amount by order_id) - net_amount ← 公式计算(见上) - payment_captured_amount ← 聚合(silver.payments.captured_amount by order_id) - payment_status ← CASE WHEN payment_captured_amount ≥ net_amount THEN 'paid' WHEN payment_captured_amount > 0 THEN 'partially_paid' ELSE 'unpaid' END - fulfillment_status ← 基于 silver.shipments.shipping_status 聚合归一 - status ← 基于 silver.orders.status 与取消/退款事件的优先级决策 - fx_rate_to_usd ← dim_fx_rates.rate WHERE currency_code AND effective_date = DATE(order_created_at_utc) - amount_usd ← net_amount * fx_rate_to_usd - is_deleted ← CDC delete 或业务取消后且无财务影响时标注;保留历史版本 八、构建示例(SQL 片段) - 订单级聚合(Gold): with items as ( select order_id, sum(unit_price * quantity) as gross_items_amount, sum(coalesce(item_discount, 0)) as item_discount_amount from silver.order_items group by order_id ), discounts as ( select order_id, sum(discount_amount) as order_discount_amount from silver.order_discounts group by order_id ), payments as ( select order_id, sum(coalesce(captured_amount, 0)) as payment_captured_amount, sum(coalesce(refunded_amount, 0)) as refund_amount from silver.payments group by order_id ), shipments as ( select order_id, max(shipping_status) as fulfillment_status, -- 统一枚举映射前进行排序逻辑 max(coalesce(shipping_fee_override, 0)) as shipping_fee_override from silver.shipments group by order_id ), fx as ( select currency_code, effective_date, rate_to_usd from dim_fx_rates ) insert into dw.gold.sales_orders select o.id as order_id, o.order_no, dc.customer_sk, ds.store_sk, dch.channel_sk, o.currency as currency_code, convert_to_utc(o.created_at) as order_created_at_utc, convert_to_utc(o.updated_at) as order_updated_at_utc, i.gross_items_amount, i.item_discount_amount, coalesce(d.order_discount_amount, 0) as order_discount_amount, coalesce(t.tax_amount, o.tax_summary_total, 0) as tax_amount, coalesce(s.shipping_fee_override, o.shipping_fee, 0) as shipping_fee, coalesce(p.refund_amount, 0) as refund_amount, -- 计算净额 (i.gross_items_amount - i.item_discount_amount - coalesce(d.order_discount_amount, 0) + coalesce(t.tax_amount, o.tax_summary_total, 0) + coalesce(s.shipping_fee_override, o.shipping_fee, 0) - coalesce(p.refund_amount, 0)) as net_amount, coalesce(p.payment_captured_amount, 0) as payment_captured_amount, case when coalesce(p.payment_captured_amount, 0) >= (i.gross_items_amount - i.item_discount_amount - coalesce(d.order_discount_amount, 0) + coalesce(t.tax_amount, o.tax_summary_total, 0) + coalesce(s.shipping_fee_override, o.shipping_fee, 0) - coalesce(p.refund_amount, 0)) then 'paid' when coalesce(p.payment_captured_amount, 0) > 0 then 'partially_paid' else 'unpaid' end as payment_status, map_fulfillment_status(s.fulfillment_status) as fulfillment_status, normalize_order_status(o.status, p.refund_amount) as status, fx.rate_to_usd as fx_rate_to_usd, ((i.gross_items_amount - i.item_discount_amount - coalesce(d.order_discount_amount, 0) + coalesce(t.tax_amount, o.tax_summary_total, 0) + coalesce(s.shipping_fee_override, o.shipping_fee, 0) - coalesce(p.refund_amount, 0)) * fx.rate_to_usd) as amount_usd, current_timestamp() as load_ts, o.source_system, o.id as source_pk, o.op_ts, o.schema_version, false as is_deleted from silver.orders o left join items i on i.order_id = o.id left join discounts d on d.order_id = o.id left join payments p on p.order_id = o.id left join shipments s on s.order_id = o.id left join silver.taxes t on t.order_id = o.id left join dim_customer dc on dc.natural_key_customer_id = o.customer_id and dc.is_current = true left join dim_store ds on ds.natural_key_store_id = o.store_id and ds.is_current = true left join dim_channel dch on dch.channel_code = o.channel and dch.is_current = true left join fx on fx.currency_code = o.currency and fx.effective_date = date(convert_to_utc(o.created_at)); - 注:normalize_order_status、map_fulfillment_status、convert_to_utc 为标准化 UDF/宏;实际实现需与企业枚举规范一致。 九、更新与幂等策略 - 增量构建: - 按 CDC 事件窗口处理,先 MERGE 到 silver;再以变更订单集重算 gold 中对应 order_id。 - MERGE 规则: - 主键 order_id;当源事件为 delete 或订单状态为 cancelled 且无财务影响时,标注 is_deleted 并保留历史记录。 - 幂等保证: - 使用 tx_id + op_ts 去重;对同一 order_id 的重放仅更新 record_version 与最新度量。 十、数据质量与审计 - 校验规则: - 单价与数量均为非负,quantity 为整数。 - 货币代码有效且在 dim_fx_rates 有当日汇率。 - 金额一致性:payment_captured_amount ≤ net_amount;refund_amount ≤ payment_captured_amount。 - 税额为非负,配送费为非负。 - 订单状态与支付/履约状态的组合合法(不允许 paid 且 net_amount=0 的异常)。 - 审计字段: - load_ts、op_ts、record_version、lineage_run_id。 - 误差与回溯: - 所有来源表保留业务键与 source_pk,支持回滚与重算。 十一、血缘与元数据管理 - 元数据: - 在编排平台记录任务依赖(raw→silver→gold)。 - 字段级血缘存入数据目录/血缘系统(如 OpenLineage 标准),包含列级输入输出映射。 - 变更管理: - Schema 演进通过版本化 schema,新增列默认安全值;删除列采用弃用周期并在转换层做兼容。 十二、存储与性能设计 - 分区: - 按 order_created_date 分区;可辅以集群键(store_sk, status)。 - 索引/排序: - 为查询场景索引 order_id、customer_sk、order_created_date。 - 额度与精度: - 金额列使用 DECIMAL(18, 2);避免二进制浮点误差。 - 时间: - 统一 UTC 存储;展示层再转用户时区。 通过以上流程,sales_orders 的每个字段均可追溯到具体上游来源表及变换逻辑,形成从电商交易系统(OLTP)到数仓 gold 层的完整数据血缘。该血缘支持审计、质量控制、重算与可回溯性,并能满足订单级分析与下游报表/数据产品的需求。
以下描述从主数据管理(MDM)系统到数据仓库维度表 dim_cust 的端到端数据血缘关系,覆盖数据摄取、转换、存储与供数流程,并给出关键转换规则与实现示例。 范围与目标 - 目标对象:dim_cust(客户维度表,支持事实表进行客户维度关联与分析) - 设计原则:以 MDM 的“黄金客户”主记录为准,采用 SCD Type 2 管理可历史化属性,保留列级血缘与作业级血缘元数据 - 关键键: - 业务键:mdm_customer_id(来自 MDM 的统一客户主键) - 代理键:cust_sk(数据仓库内生成的 surrogate key) - 时间列:effective_start_dt, effective_end_dt, current_flag 血缘分层与流程 1) 源系统(MDM) - 主要实体表(示例命名,与具体实现可有差异): - MDM_CUSTOMER:客户主档(customer_id, name, type_code, status, effective_start_ts, effective_end_ts, source_system, record_version 等) - MDM_ADDRESS:地址(customer_id, address_type, country_code, region, city, postal_code, is_primary 等) - MDM_CONTACT:联系方式(customer_id, email, phone, is_primary, contact_status 等) - MDM_XREF(可选):跨源映射(customer_id 与各源系统的原始键映射) - 输出机制:建议以 CDC(变更数据捕获)为主,包含插入、更新、删除(软/硬)事件;如无法 CDC,使用每日快照文件并计算增量 2) 着陆层(Raw/Bronze) - 摄取:通过 Kafka Connect + Debezium(或数据库原生日志订阅/变更表),将 MDM_* 的变更事件流入消息队列,再落地到对象存储/数据湖(Parquet/Avro) - 表示: - raw_mdm_customer:与源模式一致,包含操作类型(op_type)、事务时间(op_ts)、源事务键(src_tx_id) - raw_mdm_address、raw_mdm_contact:同上 - 保留策略:原样保留、不可变存档,按 ingestion_date 分区,作为审计与重放依据 3) 标准化层(Staging/Silver) - 处理目标:模式统一、类型规范化、码值映射、主记录去重、主从实体汇总 - 产出: - stg_customer:每个 mdm_customer_id 保留最新有效版本(基于 effective_* 与 record_version),规范化字段(如统一国家码、名称清洗) - stg_customer_contacts:主邮箱/主电话解析(优先 is_primary=true,且 contact_status=active) - stg_customer_address:主地址解析(优先 address_type='PRIMARY' 或 is_primary=true) - 变更识别:为支持后续 SCD2,计算变更哈希(hash_diff)仅覆盖需历史化的属性集 4) 统一与整备层(Conformed/Model) - 合并主档与主地址/主联系方式,形成统一客户核心视图 cust_core - 规则: - 单值选择:若多个主标识冲突,按优先级(is_primary、最近生效时间、来源可信度)选一 - 状态过滤:仅保留 MDM 标记为 active 且当前生效的客户记录;停用或过期记录保留用于关闭 SCD2 5) 维度构建层(Warehouse/Gold) - 目标表:dim_cust - SCD 策略: - Type 2(历史化):客户名称、类型、国家码、主地址、主联系方式等业务分析敏感属性 - Type 1(覆盖):数据质量标签、技术元数据(如标准化标记),必要时进行覆盖更新 - 合并逻辑: - 新客户:插入新维度行(生成 cust_sk,设置 current_flag=1) - 属性变更:关闭旧行(设置 effective_end_dt、current_flag=0),插入新行(current_flag=1) - 停用/删除:根据 MDM 的 status 或硬删除事件,关闭当前行并标注 end_reason - 存储:列式数据仓库(Snowflake/BigQuery/Redshift/Databricks Delta),分区按有效期或加载日期 6) 下游消费(Marts/事实表/服务) - 事实表使用 cust_sk 进行维度外键关联 - 数据服务与报表(BI/客服系统)通过维度获取客户主档与历史快照 列级血缘与转换规则(示例) - dim_cust.cust_sk:仓库内生成(序列/IDENTITY/UUID),无上游列 - dim_cust.mdm_customer_id:来源 MDM_CUSTOMER.customer_id(stg_customer.customer_id) - dim_cust.customer_name:来源 MDM_CUSTOMER.name,经名称清洗(去除多余空格/大小写规范) - dim_cust.customer_type:来源 MDM_CUSTOMER.type_code,经码值映射(例如 “P”->“个人”,“B”->“企业”) - dim_cust.primary_email:来源 stg_customer_contacts.email,规则 is_primary=true 且 contact_status=active - dim_cust.primary_phone:来源 stg_customer_contacts.phone,规则同上 - dim_cust.country_code:来源 stg_customer_address.country_code,经 ISO 3166-1 alpha-2 校正 - dim_cust.address_line_1/…:来源 stg_customer_address.*,地址标准化(缩写展开、邮编校验) - dim_cust.is_active:来源 MDM_CUSTOMER.status in ('ACTIVE') 且未过期(effective_end_ts is null 或 > 业务时间) - dim_cust.effective_start_dt/effective_end_dt/current_flag:由 SCD2 合并流程生成 - dim_cust.src_system:来源 MDM_CUSTOMER.source_system - dim_cust.src_row_id:来源 MDM_CUSTOMER.source_row_id(如存在),用于溯源到原始源系统 - dim_cust.hash_diff:对需历史化的列按稳定序列计算哈希(如 SHA256(name, type, country_code, address_hash, email, phone)) - dim_cust.load_dt/update_dt:ETL 作业时间戳 - dim_cust.end_reason(可选):'INACTIVE'/'DELETED' 等,从 MDM 事件或状态派生 SCD2 合并逻辑示例(SQL 通用伪代码) 注意:不同仓库的 MERGE 语法略有差异,以下为通用思路。 -- 变更检测视图:仅保留当前有效的 MDM 记录,并计算 hash_diff WITH src AS ( SELECT c.customer_id AS mdm_customer_id, norm_name(c.name) AS customer_name, map_type(c.type_code) AS customer_type, a.country_code AS country_code, a.address_line_1, a.address_line_2, ct.email AS primary_email, ct.phone AS primary_phone, c.status AS mdm_status, c.effective_start_ts AS src_effective_start_ts, c.effective_end_ts AS src_effective_end_ts, c.source_system AS src_system, c.source_row_id AS src_row_id, sha2(concat_ws('||', norm_name(c.name), map_type(c.type_code), a.country_code, coalesce(a.address_line_1,''), coalesce(a.address_line_2,''), coalesce(ct.email,''), coalesce(ct.phone,'') ), 256) AS hash_diff, current_timestamp() AS load_dt FROM stg_customer c LEFT JOIN stg_customer_address a ON a.customer_id = c.customer_id AND a.is_primary = true LEFT JOIN stg_customer_contacts ct ON ct.customer_id = c.customer_id AND ct.is_primary = true AND ct.contact_status = 'ACTIVE' WHERE (c.effective_end_ts IS NULL OR c.effective_end_ts > current_date()) ) -- 关闭发生变化的旧版本 UPDATE dim_cust AS d SET effective_end_dt = current_date(), current_flag = 0, update_dt = current_timestamp(), end_reason = CASE WHEN src.mdm_status <> 'ACTIVE' THEN 'INACTIVE' ELSE 'CHANGED' END FROM src WHERE d.mdm_customer_id = src.mdm_customer_id AND d.current_flag = 1 AND d.hash_diff <> src.hash_diff; -- 插入新版本(新客户或属性变更后版本) INSERT INTO dim_cust ( cust_sk, mdm_customer_id, customer_name, customer_type, country_code, address_line_1, address_line_2, primary_email, primary_phone, is_active, effective_start_dt, effective_end_dt, current_flag, src_system, src_row_id, hash_diff, load_dt, update_dt ) SELECT gen_surrogate_key(), -- 仓库侧生成 s.mdm_customer_id, s.customer_name, s.customer_type, s.country_code, s.address_line_1, s.address_line_2, s.primary_email, s.primary_phone, CASE WHEN s.mdm_status = 'ACTIVE' THEN 1 ELSE 0 END, current_date(), NULL, 1, s.src_system, s.src_row_id, s.hash_diff, s.load_dt, s.load_dt FROM src s LEFT JOIN dim_cust d ON d.mdm_customer_id = s.mdm_customer_id AND d.current_flag = 1 WHERE d.mdm_customer_id IS NULL -- 新客户 OR d.hash_diff <> s.hash_diff; -- 发生属性变更 删除/停用处理 - 软删除或停用(MDM.status='INACTIVE' 或 src_effective_end_ts 到期):关闭当前行并设置 end_reason='INACTIVE' - 硬删除事件:关闭当前行并设置 end_reason='DELETED';不物理删除维度历史行 数据质量与治理要点 - 唯一性:mdm_customer_id 全局唯一;dim_cust 中 current_flag=1 的 mdm_customer_id 唯一 - 完整性:必填列(mdm_customer_id, customer_name, customer_type, country_code)非空 - 码值校验:type_code、country_code 必须在维护的参照表内 - PII 管理:email/phone 标记敏感;落地层加密或脱敏;使用列级访问控制 - 审计列:load_dt, update_dt, src_system, src_row_id, op_ts(可于原始层保留) - 变更可追溯:保留 raw 层原始事件与 stg 层标准化快照;维度层保存历史行 元数据与血缘登记(OpenLineage 作业级示例,简化版) { "eventType": "COMPLETE", "job": { "namespace": "dwh.customer", "name": "build_dim_cust_scd2" }, "run": { "runId": "a1b2c3d4-e5f6-7890-1234-abcdef987654" }, "inputs": [ { "namespace": "lake.raw", "name": "raw_mdm_customer" }, { "namespace": "lake.raw", "name": "raw_mdm_address" }, { "namespace": "lake.raw", "name": "raw_mdm_contact" } ], "outputs": [ { "namespace": "warehouse.gold", "name": "dim_cust" } ], "producer": "airflow://dags/build_dim_cust", "schemaVersion": "1-0-0" } 调度与依赖 - DAG 顺序: - ingest_mdm_cdc → raw - standardize_mdm_customer → stg_customer/stg_address/stg_contacts - conform_customer_core → cust_core - scd2_dim_cust_merge → dim_cust - 频率:与 MDM 事件频率一致(近实时或批次),维度合并可按小时/日运行 - 失败重试:以 raw 层为重放来源,幂等合并(基于 mdm_customer_id 与 hash_diff) 总结 - 血缘从 MDM(MDM_CUSTOMER/ADDRESS/CONTACT)经 Raw→Staging→Conformed→Warehouse(dim_cust)逐层推进 - 每个列在 dim_cust 中可回溯到对应的 MDM 源列与标准化/映射规则 - 作业级血缘通过编排工具与 OpenLineage/Atlas 等登记,支持端到端追踪与审计 - 采用 SCD2 管理客户属性历史,保证事实表与报表的时点一致性与可解释性
以下内容基于标准数仓分层范式(DIM/DWD/DWS/DM),给出 bi_kpi 表从数据仓库内部各层开始的“实体级”和“列级”数据血缘描述。由于未提供您环境中的实际表结构与口径,以下为可落地的参考设计与描述模板。请将其中的库表名、口径规则与粒度字段替换为您的实际定义。 一、范围与粒度 - 目标表:dm.bi_kpi(示例命名) - 粒度:按天(biz_date)。如有维度扩展(如渠道 channel、平台 platform、地区 region),在所有相关中间层需保持同维度键对齐。 - 分层约定: - DIM:维表(如 dim_date、dim_user、dim_channel、dim_currency) - DWD:明细事实层(如 fact_order、fact_payment、fact_refund、fact_user_event、fact_marketing_spend) - DWS:汇总中间层(面向主题的日粒度宽表,例如 dws_order_metrics_day、dws_user_metrics_day、dws_marketing_metrics_day) - DM/ADS:数据集市层,提供给 BI/报表(bi_kpi) 二、实体级血缘(从数仓各层到 bi_kpi) DM 层 - dm.bi_kpi - 依赖: - dws.dws_order_metrics_day - dws.dws_user_metrics_day - dws.dws_marketing_metrics_day - 可选:dim.dim_date(对齐自然日/周/月或补全口径) DWS 层 - dws.dws_order_metrics_day - 依赖: - dwd.fact_order(订单创建/支付/完成,去重、状态口径) - dwd.fact_payment(支付实收、支付方式) - dwd.fact_refund(退款发生额/完成额) - dim.dim_date(口径日、时区) - 可选:dim.dim_currency(币种与汇率,若存在多币种) - dws.dws_user_metrics_day - 依赖: - dwd.fact_user_event(登录/访问/活跃/转化事件,去重) - dwd.fact_order(下单/支付用户,用于支付用户 UV) - dim.dim_user(注册时间、用户属性,SCD 口径) - dim.dim_date - dws.dws_marketing_metrics_day - 依赖: - dwd.fact_marketing_spend(广告消耗) - dwd.fact_user_event(到站/点击/曝光,用于到站 UV、点击数) - dim.dim_channel、dim.dim_date DWD 层(示例,不展开上游 ODS/RAW) - dwd.fact_order:订单事实(包含订单状态、金额、时间、用户、商品、渠道键等) - dwd.fact_payment:支付事实(支付成功时间、金额、方式、订单键) - dwd.fact_refund:退款事实(发起/完成时间、金额、订单键) - dwd.fact_user_event:用户行为事实(会话/访次/点击/登录/下单等事件) - dwd.fact_marketing_spend:营销投放事实(曝光/点击/消耗) 三、列级血缘(示例口径映射) 下述为常见 KPI 的列级血缘示例,请替换为您的实际口径。 - bi_kpi.biz_date - 来源:dws 各主题表的 biz_date 分区键,或 dim_date.date_key - 规则:统一时区与业务日口径(如 UTC+8 自然日) - bi_kpi.gmv - 来源:dws_order_metrics_day.gmv - dws 来源:sum(dwd.fact_order.order_amount) where order_status in ('PAID','SHIPPED','COMPLETED') 按口径决定是否包含运费/优惠 - bi_kpi.net_revenue(净收入/实收) - 来源:dws_order_metrics_day.net_revenue - dws 来源:sum(dwd.fact_payment.pay_amount) - sum(dwd.fact_refund.refund_amount where refund_status='COMPLETED') - 规则:同币种口径;多币种需按 dim_currency 汇率折算到基准币种 - bi_kpi.refund_amount - 来源:dws_order_metrics_day.refund_amount - dws 来源:sum(dwd.fact_refund.refund_amount where refund_status='COMPLETED') - bi_kpi.paying_users(支付用户数) - 来源:dws_user_metrics_day.paying_users - dws 来源:count(distinct dwd.fact_payment.user_id) - bi_kpi.active_users_7d(7 日活) - 来源:dws_user_metrics_day.active_users_7d - dws 来源:count(distinct user_id) on dwd.fact_user_event where event in ('login','view','click',...) and event_time in [biz_date-6, biz_date] - bi_kpi.new_users(新增用户) - 来源:dws_user_metrics_day.new_users - dws 来源:count(distinct dim_user.user_id where dim_user.first_login_date = biz_date) - bi_kpi.visitors(到站 UV) - 来源:dws_user_metrics_day.visitors - dws 来源:count(distinct visit_user_id) from dwd.fact_user_event where event='visit' - bi_kpi.conversion_rate(转化率) - 来源:在 bi_kpi 计算或 dws_user_metrics_day 提供 - 公式:paying_users / visitors,分母为 0 时置空或 0 - 可选派生指标 - arpu = net_revenue / active_users - arppu = net_revenue / paying_users - refund_rate = refund_amount / sum(pay_amount) - repeat_purchase_rate = multi_pay_users / paying_users 四、关键转换与口径控制 - 去重与幂等: - 订单、支付、退款按业务主键去重(如 order_id、payment_id、refund_id),优先最新事件或最终状态 - 迟到与修正: - 支持 T+1/T+N 的晚到事件;dws 层按业务水位回补,dm 层做可重算 - 时间与时区: - 所有事实表统一转为业务时区再落 dws/dm;biz_date 基于业务时区自然日 - 维度口径: - 用户维表 dim_user 使用 SCD 类型与切片时间一致性(按事件发生时间点的维度版本关联) - 币种与税费: - 多币种按汇率折算,税费、运费、优惠是否计入 GMV/净收需在 dws 层定口径并固化 五、作业依赖与调度(DAG 概览) - dwd.fact_order, dwd.fact_payment, dwd.fact_refund, dwd.fact_user_event, dwd.fact_marketing_spend, dim.* -> dws.dws_order_metrics_day, dws.dws_user_metrics_day, dws.dws_marketing_metrics_day -> dm.bi_kpi - 调度建议: - 分区增量:按 biz_date 分区跑 T+1 - 回补机制:支持日期区间回跑;dws/ dm 均可幂等重算 - 数据水位:以“最晚可完整到达时间”作为触发水位,避免不完整聚合 六、示例 SQL(简化) 1) 订单主题汇总 dws_order_metrics_day - insert overwrite table dws.dws_order_metrics_day partition (biz_date='${ds}') select '${ds}' as biz_date, coalesce(o.channel_id, -1) as channel_id, sum(case when o.status in ('PAID','SHIPPED','COMPLETED') then o.order_amount else 0 end) as gmv, sum(p.pay_amount) as pay_amount, sum(case when r.refund_status='COMPLETED' then r.refund_amount else 0 end) as refund_amount, sum(p.pay_amount) - sum(case when r.refund_status='COMPLETED' then r.refund_amount else 0 end) as net_revenue from dwd.fact_order o left join dwd.fact_payment p on p.order_id = o.order_id and p.biz_date='${ds}' left join dwd.fact_refund r on r.order_id = o.order_id and r.biz_date='${ds}' where o.biz_date='${ds}' group by coalesce(o.channel_id, -1) 2) 用户主题汇总 dws_user_metrics_day - insert overwrite table dws.dws_user_metrics_day partition (biz_date='${ds}') select '${ds}' as biz_date, coalesce(e.channel_id, -1) as channel_id, count(distinct case when e.event='visit' then e.user_id end) as visitors, count(distinct case when e.event in ('login','view','click','order','pay') then e.user_id end) as active_users, count(distinct case when u.first_login_date='${ds}' then u.user_id end) as new_users, count(distinct pay.user_id) as paying_users from dwd.fact_user_event e left join dim.dim_user u on u.user_id = e.user_id left join dwd.fact_payment pay on pay.user_id = e.user_id and pay.biz_date='${ds}' where e.biz_date='${ds}' group by coalesce(e.channel_id, -1); 3) bi_kpi 聚合 dm.bi_kpi - insert overwrite table dm.bi_kpi partition (biz_date='${ds}') select '${ds}' as biz_date, coalesce(o.channel_id, u.channel_id) as channel_id, o.gmv, o.net_revenue, o.refund_amount, u.visitors, u.active_users, u.new_users, u.paying_users, case when u.visitors > 0 then u.paying_users * 1.0 / u.visitors else 0 end as conversion_rate, case when u.active_users > 0 then o.net_revenue * 1.0 / u.active_users else 0 end as arpu, case when u.paying_users > 0 then o.net_revenue * 1.0 / u.paying_users else 0 end as arppu from dws.dws_order_metrics_day o full join dws.dws_user_metrics_day u on o.biz_date='${ds}' and u.biz_date='${ds}' and o.channel_id = u.channel_id; 七、数据质量与血缘验证 - 粒度唯一性:dm.bi_kpi 在分区内按 key(biz_date, channel_id, ...) 唯一 - 对账校验: - sum(dm.net_revenue) 对齐 sum(dws.pay_amount) - sum(dws.refund_amount) - paying_users <= active_users <= visitors - 空值/异常: - 重要字段非空(biz_date、gmv、net_revenue);比率在 [0,1] 范围 - 延迟监控: - 每日分区行数对比 7 日均值阈值 - 血缘产出: - 在编排层打点 OpenLineage/Marquez 或 DataHub/dbt docs 暴露 upstream -> downstream 关系 - 列级血缘通过 SQL 解析或 dbt source freshness + exposures 出图 八、交付建议 - 在仓库内为每张表补充 COMMENT,记录口径与上游依赖 - 用同一枚举表维护 KPI 口径版本,避免跨团队理解不一致 - 将本文的“实体级血缘”和“列级血缘”以可视化图(DAG)与文档形式固化到数据目录 以上血缘描述覆盖了 bi_kpi 从数仓内部各层的主要上游表与关键转换规则。将示例库表名与口径替换为您的实际定义后,即可形成可执行的数据血缘说明与实现方案。需要我基于您现有表结构生成精确到列、到 SQL 的血缘文档时,请提供实际的表清单与字段字典。
用它一键梳理指定表的上下游与字段关系,快速评估改造影响,补齐血缘文档,指导任务重构与发布。
统一血缘口径与文档格式,支撑合规审计、权限审批与变更评审,沉淀可追溯的数据资产说明。
清楚看到指标来源与加工路径,定位报表异常根因,协调修复方案,减少跨团队沟通往返。
事故发生时沿链路溯源,快速判断波及范围与优先级,形成复盘记录与预防清单。
评审前识别依赖与风险,量化需求变更影响,产出直观可读的血缘说明支撑决策。
按要求导出血缘说明,验证数据流向与留痕,提升检查通过率并降低合规风险。
帮助数据团队与业务方快速看懂“一张表从哪里来、经历了什么、最终被谁使用”的来龙去脉,以清晰、标准化的血缘说明提升排障效率、支撑审计与合规、减少跨部门沟通成本,并将复杂流程转化为可直接用于文档、评审材料和工单的内容;支持按需指定起始系统与目标表、切换输出语言与结构,降低新人上手难度,沉淀团队知识资产,促使更多用户愿意试用并升级,从而在治理、分析和交付场景中持续获益。
将模板生成的提示词复制粘贴到您常用的 Chat 应用(如 ChatGPT、Claude 等),即可直接对话使用,无需额外开发。适合个人快速体验和轻量使用场景。
把提示词模板转化为 API,您的程序可任意修改模板参数,通过接口直接调用,轻松实现自动化与批量处理。适合开发者集成与业务系统嵌入。
在 MCP client 中配置对应的 server 地址,让您的 AI 应用自动调用提示词模板。适合高级用户和团队协作,让提示词在不同 AI 工具间无缝衔接。
免费获取高级提示词-优惠即将到期