Data Pipeline Automation in Snowflake
Emily Melhuish
Technical Curriculum Developer, Snowflake
CDC: Change Data Capture

What a stream does

| Stream Type | Captures | Best For |
|---|---|---|
| Standard | All table types and views & all DML changes - tracks inserts, updates, deletes | Tables where any row can change (e.g. shipments) |
| Append-only | All table types and views, except external tables - tracks row inserts only | Insert-once tables (e.g. delivery events) - more efficient |
| Insert-only | Externally managed Apache Iceberg and external tables - tracks row inserts only | External tables |
Directory tables surface file metadata for a stage (name, size, last modified timestamp)
Standard stream on the shipments table
CREATE STREAM shipments_stream
ON TABLE logistics.shipments;
Append-only stream on the delivery events table
CREATE STREAM delivery_events_stream
ON TABLE logistics.delivery_events
APPEND_ONLY = TRUE;
SELECT product, quantity, METADATA$ACTION, METADATA$ISUPDATE, METADATA$ROW_ID
FROM shipments_stream;
METADATA$ACTION: INSERT or DELETEMETADATA$ISUPDATE: TRUE when part of an update pairMETADATA$ROW_ID: Unique physical row identifierMETADATA$ISUPDATE = TRUE


CREATE TASK logistics.sync_shipments
WAREHOUSE = compute_wh
SCHEDULE = '5 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('logistics.staging_shipments_stream')
AS
INSERT INTO logistics.shipments
SELECT shipment_id, region, carrier, delivery_days
FROM logistics.staging_shipments_stream
WHERE METADATA$ACTION = 'INSERT';
Data Pipeline Automation in Snowflake