Automate data stream ingestion into a Snowflake database by using Snowflake Snowpipe, Amazon S3, Amazon SNS, and Amazon Data Firehose
Created by Bikash Chandra Rout (AWS)
Environment: PoC or pilot | Technologies: Storage & backup |
Summary
This pattern describes how you can use services on the Amazon Web Services (AWS) Cloud to process a continuous stream of data and load it into a Snowflake database. The pattern uses Amazon Data Firehose to deliver the data to Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS) to send notifications when new data is received, and Snowflake Snowpipe to load the data into a Snowflake database.
By following this pattern, you can have continuously generated data available for analysis in seconds, avoid multiple manual COPY commands, and have full support for semi-structured data on load.
Prerequisites and limitations
Prerequisites
An active AWS account.
A data source that is continuously sending data to a Firehose delivery stream.
An existing S3 bucket that is receiving the data from the Firehose delivery stream.
An active Snowflake account.
Limitations
Snowflake Snowpipe doesn't connect directly to Firehose.
Architecture
Technology stack
Amazon Data Firehose
Amazon SNS
Amazon S3
Snowflake Snowpipe
Snowflake database
Tools
Firehose – Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon OpenSearch Service, Splunk, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers.
Amazon S3 – Amazon Simple Storage Service (Amazon S3) is storage for the internet.
Amazon SNS – Amazon Simple Notification Service (Amazon SNS) coordinates and manages the delivery or sending of messages to subscribing endpoints or clients.
Snowflake
– Snowflake is an analytic data warehouse provided as Software-as-a-Service (SaaS). Snowflake Snowpipe
– Snowpipe loads data from files as soon as they’re available in a Snowflake stage.
Epics
Task | Description | Skills required |
---|---|---|
Create a CSV file in Snowflake. | Sign in to Snowflake and run the “CREATE FILE FORMAT” command to create a CSV file with a specified field delimiter. For more information about this and other Snowflake commands, see the “Additional information” section. | Developer |
Create an external Snowflake stage. | Run the “CREATE STAGE” command to create an external Snowflake stage that references the CSV file you created earlier. Important: You will need the URL for the S3 bucket, your AWS access key, and your AWS secret access key. Run the “SHOW STAGES” command to verify that the Snowflake stage is created. | Developer |
Create the Snowflake target table. | Run the “CREATE TABLE” command to create the Snowflake table. | Developer |
Create a pipe. | Run the “CREATE PIPE” command; make sure that “auto_ingest=true” in the command. Run the “SHOW PIPES” command to verify that the pipe is created. Copy and save the “notification_channel” column value. This value will be used to configure Amazon S3 event notifications. | Developer |
Task | Description | Skills required |
---|---|---|
Create a 30-day lifecycle policy for the S3 bucket. | Sign in to the AWS Management Console and open the Amazon S3 console. Choose the S3 bucket that contains the data from Firehose. Then choose the “Management” tab in the S3 bucket and choose “Add lifecycle rule.” Enter a name for your rule in the “Lifecycle rule” dialog box and configure a 30-day lifecycle rule for your bucket. For help with this and other stories, see the “Related resources” section. | System Administrator, Developer |
Create an IAM policy for the S3 bucket. | Open the AWS Identity and Access Management (IAM) console and choose “Policies.” Choose “Create policy,” and choose the “JSON” tab. Copy and paste the policy from the “Additional information” section into the JSON field. This policy will grant “PutObject” and “DeleteObject” permissions, as well as “GetObject,” GetObjectVersion,” and “ListBucket” permissions. Choose “Review policy,” enter a policy name, and then choose “Create policy.” | System Administrator, Developer |
Assign the policy to an IAM role. | Open the IAM console choose “Roles,” and then choose “Create role.” Choose “Another AWS account” as the trusted entity. Enter your AWS account ID, and choose “Require external ID.” Enter a placeholder ID that you will change it later. Choose “Next” and assign the IAM policy you created earlier. Then create the IAM role. | System Administrator, Developer |
Copy the Amazon Resource Name (ARN) for the IAM role. | Open the IAM console and choose “Roles.” Choose the IAM role you created earlier, and then copy and store the “Role ARN.” | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Create a storage integration in Snowflake. | Sign in to Snowflake and run the “CREATE STORAGE INTEGRATION” command. This will modify the trusted relationship, grant access to Snowflake, and provide the external ID for your Snowflake stage. | System Administrator, Developer |
Retrieve the IAM role for your Snowflake account. | Run the “DESC INTEGRATION” command to retrieve the ARN for the IAM role. Important: <integration_ name> is the name of the Snowflake storage integration you created earlier. | System Administrator, Developer |
Record two column values. | Copy and save the values for the “storage_aws_iam_user_arn” and “storage_aws_external_id” columns. | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Modify the IAM role policy. | Open the IAM console and choose “Roles.” Choose the IAM role you created earlier and choose the “Trust relationships” tab. Choose “Edit trust relationship.” Replace “snowflake_external_id” with the “storage_aws_external_id ” value you copied earlier. Replace “snowflake_user_arn” with the storage_aws_iam_user_arn” value you copied earlier. Then choose “Update trust policy.” | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Turn on event notifications for the S3 bucket. | Open the Amazon S3 console and choose your bucket. Choose “Properties,” and under “Advanced settings” choose “Events.” Choose “Add notification,” and enter a name for this event. If you don't enter a name, a globally unique identifier (GUID) will be used. | System Administrator, Developer |
Configure Amazon SNS notifications for the S3 bucket. | Under “Events,” choose “ObjectCreate (All)” and then choose “SQS Queue” in the “Send to” dropdown list. In the “SNS” list choose “Add SQS queue ARN,” and paste the “notification_channel” value you copied earlier. Then choose “Save.” | System Administrator, Developer |
Subscribe the Snowflake SQS queue to the SNS topic. | Subscribe the Snowflake SQS queue to the SNS topic you created. For help with this step, see the “Related resources” section. | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Check and test Snowpipe. | Sign in to Snowflake and open the Snowflake stage. Drop files into your S3 bucket and check if the Snowflake table loads them. Amazon S3 will send SNS notifications to Snowpipe when new objects appear in the S3 bucket. | System Administrator, Developer |
Related resources
Additional information
Create a file format:
CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;
Create an external stage:
externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )
Create a table:
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]
Show stages:
SHOW STAGES;
Create a pipe:
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS
Show pipes:
SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
Create a storage integration:
CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Example:
create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/') storage_blocked_locations = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');
For more information about this step, see Configuring a Snowflake storage integration to access Amazon S3
Describe an integration:
DESC INTEGRATION <integration_name>;
S3 bucket policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }