Data Pipeline Automation in Snowflake
Emily Melhuish
Technical Curriculum Developer, Snowflake
Example:
-- Today's manual routine (fragile)
CALL logistics.refresh_ops_dashboard();
CREATE TASK logistics.refresh_dashboard
WAREHOUSE = harbr_wh
SCHEDULE = 'USING CRON 0 6 * * * UTC'
AS CALL logistics.refresh_ops_dashboard();
CREATE OR REPLACE TASK transform_delivery_summary
WAREHOUSE = harbr_wh
SCHEDULE = 'USING CRON 5 * * * * UTC'
AS INSERT INTO delivery_summary
SELECT shipment_id, status, updated_at
FROM delivery_events
WHERE processed = FALSE;
Warehouse-based
CREATE TASK my_task
WAREHOUSE = harbr_wh -- named warehouse
SCHEDULE = '5 MINUTE'
AS INSERT INTO ...;
Serverless
WAREHOUSE clauseCREATE TASK my_serverless_task
-- no WAREHOUSE clause
SCHEDULE = '5 MINUTE'
AS INSERT INTO ...;

AFTER
-- Activate a task
ALTER TASK mytask RESUME;
-- Pause a task
ALTER TASK mytask SUSPEND;
-- Execute a task
EXECUTE TASK mytask SUSPEND;

CREATE OR REPLACE TASK logistics.process_delivery_events
WAREHOUSE = compute_wh
SCHEDULE = '5 minute'
WHEN SYSTEM$STREAM_HAS_DATA('logistics.delivery_events_stream')
AS
INSERT INTO logistics.processed_events
SELECT * FROM logistics.delivery_events_stream;
Data Pipeline Automation in Snowflake