以主键或业务唯一键为匹配键;对无更新标识字段的表,可使用哈希比对(MD5)检测变化。
MERGE INTO curated.my_table tgt
USING (
SELECT *
FROM raw.my_table
WHERE _load_ts >= (SELECT COALESCE(MAX(processed_ts), '1970-01-01') FROM meta.load_watermark WHERE table_name='my_table')
) src
ON tgt.id = src.id
WHEN MATCHED THEN UPDATE SET
col1 = src.col1, col2 = src.col2, ...
WHEN NOT MATCHED THEN INSERT (id, col1, col2, ...) VALUES (src.id, src.col1, src.col2, ...);
可用Streams/Tasks自动化:
CREATE OR REPLACE STREAM raw.my_table_stream ON TABLE raw.my_table;
CREATE OR REPLACE TASK curated.merge_my_table
WAREHOUSE = WH_MEDIUM
SCHEDULE = 'USING CRON 0 * * * * UTC'
AS
MERGE ... USING (SELECT * FROM raw.my_table_stream) src ...;
Snowflake文件格式与Stage
CREATE OR REPLACE FILE FORMAT ff_parquet TYPE = PARQUET;
CREATE OR REPLACE STAGE stg_mysql_parquet
STORAGE_INTEGRATION = aws_int
URL = 's3://bucket/mysql/';
Snowpipe(事件触发自动装载)
CREATE OR REPLACE PIPE pipe_my_table
AUTO_INGEST = TRUE
AS
COPY INTO raw.my_table_cdc
FROM @stg_mysql_parquet/mydb/my_table/
FILE_FORMAT = (FORMAT_NAME = ff_parquet)
MATCH_BY_COLUMN_NAME = CASE_INSENSITIVE
ON_ERROR = 'CONTINUE';
配置S3通知到Snowflake(使用PIPE的通知通道ARN);Snowpipe基于文件名去重。
增量应用(CDC MERGE)
MERGE INTO curated.my_table tgt
USING (
SELECT *
FROM raw.my_table_cdc
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY source_ts DESC) = 1
) src
ON tgt.id = src.id
WHEN MATCHED AND src.op = 'D' THEN DELETE
WHEN MATCHED AND src.op IN ('U','I') THEN UPDATE SET ...
WHEN NOT MATCHED AND src.op IN ('I','U') THEN INSERT (...);
COPY到原始CDC表:
COPY INTO raw.orders_cdc
FROM @stg_mysql_parquet/mydb/orders/
FILE_FORMAT=(FORMAT_NAME=ff_parquet)
MATCH_BY_COLUMN_NAME=CASE_INSENSITIVE
ON_ERROR='CONTINUE';
MERGE:
MERGE INTO curated.orders tgt
USING (
SELECT id, col1, col2, op, source_ts
FROM raw.orders_cdc
QUALIFY ROW_NUMBER() OVER (PARTITION BY id ORDER BY source_ts DESC) = 1
) src
ON tgt.id = src.id
WHEN MATCHED AND src.op='D' THEN DELETE
WHEN MATCHED AND src.op IN ('U','I') THEN UPDATE SET col1=src.col1, col2=src.col2
WHEN NOT MATCHED AND src.op IN ('I','U') THEN INSERT (id,col1,col2) VALUES (src.id,src.col1,src.col2);
DDL示例:
CREATE TABLE project.dataset.target_table
PARTITION BY DATE(event_ts)
CLUSTER BY region
AS SELECT * FROM project.dataset._empty_source WHERE 1=0;
(或使用bq mk --table --schema schema.json 并通过API设置分区/聚簇)
若存在主键 id,增量合并示例:
MERGE project.dataset.target_table T
USING (
SELECT DISTINCT * FROM project.dataset.staging_table
) S
ON T.id = S.id
WHEN MATCHED THEN
UPDATE SET amount = S.amount, event_ts = S.event_ts, region = S.region
WHEN NOT MATCHED THEN
INSERT (id, amount, event_ts, region) VALUES (S.id, S.amount, S.event_ts, S.region);