4 min read

How to Implement Stream-Based SCD Type 2 in Snowflake

Nasrul Hasan
Nasrul Hasan
Nasrul Hasan
Cover Image for How to Implement Stream-Based SCD Type 2 in Snowflake

Create an MD5 hash of all business attributes and only trigger SCD2 when the hash changes.

In Snowflake, this works beautifully with Streams + MERGE.

Let’s build the entire pipeline step-by-step.

Tables Setup

-- 1. This is where raw data lands
CREATE OR REPLACE TABLE SRC_CUSTOMER (
    CUSTOMER_ID STRING,
    NAME STRING,
    CITY STRING,
    UPDATED_AT TIMESTAMP
);


-- 2. This is the target table where we merge our new data
CREATE OR REPLACE TABLE DIM_CUSTOMER_SCD2 (
    CUSTOMER_SK NUMBER AUTOINCREMENT,
    CUSTOMER_ID STRING,
    NAME STRING,
    CITY STRING,
    HASH_MD5 STRING,        -- HASH of business columns
    VALID_FROM TIMESTAMP,
    VALID_TO TIMESTAMP,
    IS_CURRENT BOOLEAN
);
pgsql

Create stream on source table

This stream will capture only changed field

CREATE OR REPLACE STREAM SRC_CUSTOMER_STREAM 
ON TABLE SRC_CUSTOMER
SHOW_INITIAL = FALSE;
pgsql

Generate the MD5 Hash (Snowflake Views)

In Snowflake we use
MD5(TO_VARCHAR(...))
gams
. we calculate the view which generate hash from streamed table.
-- Creates a hash based on attributes contributing to SCD2
CREATE OR REPLACE VIEW SRC_CUSTOMER_HASHED AS
SELECT
    CUSTOMER_ID,
    NAME,
    CITY,
    MD5(
        UPPER(
            TRIM(
                CUSTOMER_ID || '|' ||
                NAME || '|' ||
                CITY
            )
        )
    ) AS HASH_MD5,
    UPDATED_AT
FROM SRC_CUSTOMER_STREAM;
pgsql

The Magic Merge

This is the heart of the pattern.

✔ Only expire & insert when the hash differs

(no need to check every column manually)

CREATE OR REPLACE TASK TASK_RUN_SCD2_CUSTOMER
WAREHOUSE = COMPUTE_WH
SCHEDULE = '1 MINUTE'
AS
    MERGE INTO DIM_CUSTOMER_SCD2 tgt
    USING SRC_CUSTOMER_HASHED src
    ON tgt.CUSTOMER_ID = src.CUSTOMER_ID
       AND tgt.IS_CURRENT = TRUE
    WHEN MATCHED AND tgt.HASH_MD5 <> src.HASH_MD5 THEN
        UPDATE SET 
            tgt.IS_CURRENT = FALSE,
            tgt.VALID_TO = CURRENT_TIMESTAMP()
    WHEN NOT MATCHED THEN
        INSERT (
            CUSTOMER_ID,
            NAME,
            CITY,
            HASH_MD5,
            VALID_FROM,
            VALID_TO,
            IS_CURRENT
        )
        VALUES (
            src.CUSTOMER_ID,
            src.NAME,
            src.CITY,
            src.HASH_MD5,
            CURRENT_TIMESTAMP(),
            NULL,
            TRUE
        );
pgsql

Wrap-Up

Using a Snowflake Stream + MD5 hash + MERGE pattern gives you:

  • Fast change detection

  • Clean SCD2 logic

  • Fully incremental processing

  • A production-grade dimension architecture

Hit follow if you're intrested in data engineering content.