Automate data stream ingestion into a Snowflake database by using Snowflake Snowpipe, Amazon S3, Amazon SNS, and Amazon Data Firehose - AWS Prescriptive Guidance

Automate data stream ingestion into a Snowflake database by using Snowflake Snowpipe, Amazon S3, Amazon SNS, and Amazon Data Firehose

Created by Bikash Chandra Rout (AWS)

Environment: PoC or pilot

Technologies: Storage & backup

Summary

This pattern describes how you can use services on the Amazon Web Services (AWS) Cloud to process a continuous stream of data and load it into a Snowflake database. The pattern uses Amazon Data Firehose to deliver the data to Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS) to send notifications when new data is received, and Snowflake Snowpipe to load the data into a Snowflake database.

By following this pattern, you can have continuously generated data available for analysis in seconds, avoid multiple manual COPY commands, and have full support for semi-structured data on load.

Prerequisites and limitations

Prerequisites 

  • An active AWS account.

  • A data source that is continuously sending data to a Firehose delivery stream.

  • An existing S3 bucket that is receiving the data from the Firehose delivery stream.

  • An active Snowflake account.

Limitations 

  • Snowflake Snowpipe doesn't connect directly to Firehose.

Architecture

Data ingested by Firehose goes to Amazon S3, Amazon SNS, Snowflake Snowpipe, and the Snowflake DB.

Technology stack 

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowpipe

  • Snowflake database

Tools

  • Amazon Data Firehose is a fully managed service for delivering real-time streaming data to destinations such as Amazon S3, Amazon Redshift, Amazon OpenSearch Service, Splunk, and any custom HTTP endpoint or HTTP endpoints owned by supported third-party service providers.

  • Amazon Simple Storage Service (Amazon S3) is storage for the internet.

  • Amazon Simple Notification Service (Amazon SNS) coordinates and manages the delivery or sending of messages to subscribing endpoints or clients.

  • Snowflake – Snowflake is an analytic data warehouse provided as Software-as-a-Service (SaaS).

  • Snowflake Snowpipe – Snowpipe loads data from files as soon as they’re available in a Snowflake stage.

Epics

TaskDescriptionSkills required

Create a CSV file in Snowflake.

Sign in to Snowflake and run the CREATE FILE FORMAT command to create a CSV file with a specified field delimiter. For more information about this and other Snowflake commands, see the Additional information section.

Developer

Create an external Snowflake stage.

Run the CREATE STAGE command to create an external Snowflake stage that references the CSV file you created earlier. Important: You will need the URL for the S3 bucket, your AWS access key, and your AWS secret access key. Run the SHOW STAGES command to verify that the Snowflake stage is created.

Developer

Create the Snowflake target table.

Run the CREATE TABLE command to create the Snowflake table.

Developer

Create a pipe.

Run the CREATE PIPE command; make sure that auto_ingest=true is in the command. Run the SHOW PIPES command to verify that the pipe is created. Copy and save the notification_channel column value. This value will be used to configure Amazon S3 event notifications.

Developer
TaskDescriptionSkills required

Create a 30-day lifecycle policy for the S3 bucket.

Sign in to the AWS Management Console and open the Amazon S3 console. Choose the S3 bucket that contains the data from Firehose. Then choose the Management tab in the S3 bucket and choose Add lifecycle rule. Enter a name for your rule in the Lifecycle rule dialog box, and configure a 30-day lifecycle rule for your bucket. For help with this and other stories, see the Related resources section.

System Administrator, Developer

Create an IAM policy for the S3 bucket.

Open the AWS Identity and Access Management (IAM) console and choose Policies. Choose Create policy, and choose the JSON tab. Copy and paste the policy from the Additional information section into the JSON field. This policy will grant PutObject and DeleteObject permissions, as well as GetObject, GetObjectVersion, and ListBucket permissions. Choose Review policy, enter a policy name, and then choose Create policy.

System Administrator, Developer

Assign the policy to an IAM role.

Open the IAM console, choose Roles, and then choose Create role. Choose Another AWS account as the trusted entity. Enter your AWS account ID, and choose Require external ID. Enter a placeholder ID that you will change it later. Choose Next, and assign the IAM policy you created earlier. Then create the IAM role.

System Administrator, Developer

Copy the Amazon Resource Name (ARN) for the IAM role.

Open the IAM console, and choose Roles. Choose the IAM role you created earlier, and then copy and store the Role ARN.

System Administrator, Developer
TaskDescriptionSkills required

Create a storage integration in Snowflake.

Sign in to Snowflake and run the CREATE STORAGE INTEGRATION command. This will modify the trusted relationship, grant access to Snowflake, and provide the external ID for your Snowflake stage.

System Administrator, Developer

Retrieve the IAM role for your Snowflake account.

Run the DESC INTEGRATION command to retrieve the ARN for the IAM role.

Important: <integration_ name> is the name of the Snowflake storage integration you created earlier.

System Administrator, Developer

Record two column values.

Copy and save the values for the storage_aws_iam_user_arn and storage_aws_external_id columns.

System Administrator, Developer
TaskDescriptionSkills required

Modify the IAM role policy.

Open the IAM console and choose Roles. Choose the IAM role you created earlier and choose the Trust relationships tab. Choose Edit trust relationship. Replace snowflake_external_id with the storage_aws_external_id value you copied earlier. Replace snowflake_user_arn with the storage_aws_iam_user_arn value you copied earlier. Then choose Update trust policy.

System Administrator, Developer
TaskDescriptionSkills required

Turn on event notifications for the S3 bucket.

Open the Amazon S3 console and choose your bucket. Choose Properties, and under Advanced settings, choose Events. Choose Add notification, and enter a name for this event. If you don't enter a name, a globally unique identifier (GUID) will be used.

System Administrator, Developer

Configure Amazon SNS notifications for the S3 bucket.

Under Events, choose ObjectCreate (All), and then choose SQS Queue in the Send to dropdown list. In the SNS list, choose Add SQS queue ARN, and paste the notification_channel value you copied earlier. Then choose Save.

System Administrator, Developer

Subscribe the Snowflake SQS queue to the SNS topic.

Subscribe the Snowflake SQS queue to the SNS topic you created. For help with this step, see the Related resources section.

System Administrator, Developer
TaskDescriptionSkills required

Check and test Snowpipe.

Sign in to Snowflake and open the Snowflake stage. Drop files into your S3 bucket and check if the Snowflake table loads them. Amazon S3 will send SNS notifications to Snowpipe when new objects appear in the S3 bucket.

System Administrator, Developer

Related resources

Additional information

Create a file format:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

Create an external stage:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

Create a table:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

Show stages:

SHOW STAGES;

Create a pipe:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

Show pipes:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

Create a storage integration:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Example:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

For more information about this step, see Configuring a Snowflake storage integration to access Amazon S3 from the Snowflake documentation.

Describe an integration:

DESC INTEGRATION <integration_name>;

S3 bucket policy:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }