Data Pipeline Automation in Snowflake
Emily Melhuish
Technical Curriculum Developer, Snowflake



About Harbr

Stage = temporary storage location

Internal Stages
External Stage
Directory Table
CREATE STAGE harbr_stage
DIRECTORY = (ENABLE = TRUE)
Internal Stage Encryption
ENCRYPTION = (TYPE = SNOWFLAKE_FULL)
ENCRYPTION = (TYPE = SNOWFLAKE_SSE)
External Stage Encryption
ENCRYPTION =
([ TYPE = 'AWS_CSE' ] MASTER_KEY = '')
ENCRYPTION =
([ TYPE = 'AWS_SSE_S3' ])
...
Semi-structured formats: JSON, Parquet, Avro, ORC and XML.
VARIANT columnUpdate the format object once
SQL
CREATE FILE FORMAT harbr_csv_format
TYPE = 'CSV'
FIELD_DELIMITER = ','
SKIP_HEADER = 1;
Load from a named stage using a named file format
COPY INTO logistics.shipments
FROM @harbr_internal_stage/shipments/
FILE_FORMAT = (FORMAT_NAME = 'harbr_csv_format')
| ON_ERROR Options | Behavior |
|---|---|
ABORT_STATEMENT |
Fail entire load on first error (default) |
CONTINUE |
Skip bad rows, load everything else |
SKIP_FILE |
Skip entire file if it contains any errors |
SKIP_FILE_<num> |
Skip file only if errors exceed a specified count |
SKIP_FILE_<num>% |
Skip file only if errors exceed a specified % threshold |
Validate without loading = dry run
COPY INTO logistics.shipments
FROM @harbr_internal_stage/shipments/
FILE_FORMAT = (FORMAT_NAME = 'harbr_csv_format')
VALIDATION_MODE = RETURN_ERRORS;
Inspect load history
SELECT * FROM TABLE(INFORMATION_SCHEMA.COPY_HISTORY(
TABLE_NAME => 'shipments',
START_TIME => DATEADD('hour', -24,
CURRENT_TIMESTAMP())));
Data Pipeline Automation in Snowflake