Automate data stream ingestion into a Snowflake database by using Snowflake Snowpipe, Amazon S3, Amazon SNS, and Amazon Data Firehose
Created by Bikash Chandra Rout (AWS)
Environment: PoC or pilot | Technologies: Storage & backup |
Summary
This pattern describes how you can use services on the Amazon Web Services (AWS) Cloud to process a continuous stream of data and load it into a Snowflake database. The pattern uses Amazon Data Firehose to deliver the data to Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS) to send notifications when new data is received, and Snowflake Snowpipe to load the data into a Snowflake database.
By following this pattern, you can have continuously generated data available for analysis in seconds, avoid multiple manual COPY
commands, and have full support for semi-structured data on load.
Prerequisites and limitations
Prerequisites
An active AWS account.
A data source that is continuously sending data to a Firehose delivery stream.
An existing S3 bucket that is receiving the data from the Firehose delivery stream.
An active Snowflake account.
Limitations
Snowflake Snowpipe doesn't connect directly to Firehose.
Architecture
Technology stack
Amazon Data Firehose
Amazon SNS
Amazon S3
Snowflake Snowpipe
Snowflake database
Tools
Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon OpenSearch Service, Splunk, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers.
Amazon Simple Storage Service (Amazon S3) is storage for the internet.
Amazon Simple Notification Service (Amazon SNS) coordinates and manages the delivery or sending of messages to subscribing endpoints or clients.
Snowflake
– Snowflake is an analytic data warehouse provided as Software-as-a-Service (SaaS). Snowflake Snowpipe
– Snowpipe loads data from files as soon as they’re available in a Snowflake stage.
Epics
Task | Description | Skills required |
---|---|---|
Create a CSV file in Snowflake. | Sign in to Snowflake and run the | Developer |
Create an external Snowflake stage. | Run the | Developer |
Create the Snowflake target table. | Run the | Developer |
Create a pipe. | Run the | Developer |
Task | Description | Skills required |
---|---|---|
Create a 30-day lifecycle policy for the S3 bucket. | Sign in to the AWS Management Console and open the Amazon S3 console. Choose the S3 bucket that contains the data from Firehose. Then choose the Management tab in the S3 bucket and choose Add lifecycle rule. Enter a name for your rule in the Lifecycle rule dialog box, and configure a 30-day lifecycle rule for your bucket. For help with this and other stories, see the Related resources section. | System Administrator, Developer |
Create an IAM policy for the S3 bucket. | Open the AWS Identity and Access Management (IAM) console and choose Policies. Choose Create policy, and choose the JSON tab. Copy and paste the policy from the Additional information section into the JSON field. This policy will grant | System Administrator, Developer |
Assign the policy to an IAM role. | Open the IAM console, choose Roles, and then choose Create role. Choose Another AWS account as the trusted entity. Enter your AWS account ID, and choose Require external ID. Enter a placeholder ID that you will change it later. Choose Next, and assign the IAM policy you created earlier. Then create the IAM role. | System Administrator, Developer |
Copy the Amazon Resource Name (ARN) for the IAM role. | Open the IAM console, and choose Roles. Choose the IAM role you created earlier, and then copy and store the Role ARN. | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Create a storage integration in Snowflake. | Sign in to Snowflake and run the | System Administrator, Developer |
Retrieve the IAM role for your Snowflake account. | Run the Important: | System Administrator, Developer |
Record two column values. | Copy and save the values for the | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Modify the IAM role policy. | Open the IAM console and choose Roles. Choose the IAM role you created earlier and choose the Trust relationships tab. Choose Edit trust relationship. Replace | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Turn on event notifications for the S3 bucket. | Open the Amazon S3 console and choose your bucket. Choose Properties, and under Advanced settings, choose Events. Choose Add notification, and enter a name for this event. If you don't enter a name, a globally unique identifier (GUID) will be used. | System Administrator, Developer |
Configure Amazon SNS notifications for the S3 bucket. | Under Events, choose ObjectCreate (All), and then choose SQS Queue in the Send to dropdown list. In the SNS list, choose Add SQS queue ARN, and paste the | System Administrator, Developer |
Subscribe the Snowflake SQS queue to the SNS topic. | Subscribe the Snowflake SQS queue to the SNS topic you created. For help with this step, see the Related resources section. | System Administrator, Developer |
Task | Description | Skills required |
---|---|---|
Check and test Snowpipe. | Sign in to Snowflake and open the Snowflake stage. Drop files into your S3 bucket and check if the Snowflake table loads them. Amazon S3 will send SNS notifications to Snowpipe when new objects appear in the S3 bucket. | System Administrator, Developer |
Related resources
Additional information
Create a file format:
CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;
Create an external stage:
externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )
Create a table:
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]
Show stages:
SHOW STAGES;
Create a pipe:
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS
Show pipes:
SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
Create a storage integration:
CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Example:
create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');
For more information about this step, see Configuring a Snowflake storage integration to access Amazon S3
Describe an integration:
DESC INTEGRATION <integration_name>;
S3 bucket policy:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }