Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Automatizza l'inserimento di flussi di dati in un database Snowflake utilizzando Snowflake Snowpipe, Amazon S3, Amazon SNS e Amazon Data Firehose
Creato da Bikash Chandra Rout (AWS)
Riepilogo
Questo modello descrive come utilizzare i servizi su Amazon Web Services (AWS) Cloud per elaborare un flusso continuo di dati e caricarlo in un database Snowflake. Il modello utilizza Amazon Data Firehose per inviare i dati ad Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (Amazon SNS) per inviare notifiche quando vengono ricevuti nuovi dati e Snowflake Snowpipe per caricare i dati in un database Snowflake.
Seguendo questo schema, puoi avere a disposizione i dati generati continuamente per l'analisi in pochi secondi, evitare più COPY
comandi manuali e avere il supporto completo per i dati semistrutturati in fase di caricamento.
Prerequisiti e limitazioni
Prerequisiti
Un attivo Account AWS.
Una fonte di dati che invia continuamente dati a un flusso di distribuzione Firehose.
Un bucket S3 esistente che riceve i dati dal flusso di distribuzione Firehose.
Un account Snowflake attivo.
Limitazioni
Snowflake Snowpipe non si collega direttamente a Firehose.
Architettura

Stack tecnologico
Amazon Data Firehose
Amazon SNS
Amazon S3
Snowflake Snowpipe
Banca dati Snowflake
Strumenti
Amazon Data Firehose è un servizio completamente gestito per la distribuzione di dati di streaming in tempo reale a destinazioni come Amazon S3, Amazon Redshift, OpenSearch Amazon Service, Splunk e qualsiasi endpoint HTTP o endpoint HTTP personalizzato di proprietà di provider di servizi terzi supportati.
Amazon Simple Storage Service (Amazon S3) Simple Storage Service (Amazon S3) è uno storage per Internet.
Amazon Simple Notification Service (Amazon SNS) coordina e gestisce la consegna o l'invio di messaggi agli endpoint o ai clienti abbonati.
Snowflake
— Snowflake è un data warehouse analitico fornito come (SaaS). Software-as-a-Service Snowflake Snowpipe: Snowpipe
carica i dati dai file non appena sono disponibili in una fase Snowflake.
Epiche
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un file CSV in Snowflake. | Accedi a Snowflake ed esegui il | Developer |
Crea uno stage Snowflake esterno. | Eseguite il | Developer |
Create la tabella di destinazione Snowflake. | Esegui il | Developer |
Crea una pipa. | Esegui il | Developer |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea una politica del ciclo di vita di 30 giorni per il bucket S3. | Accedi AWS Management Console e apri la console Amazon S3. Scegliete il bucket S3 che contiene i dati di Firehose. Quindi scegli la scheda Gestione nel bucket S3 e scegli Aggiungi regola del ciclo di vita. Inserisci un nome per la regola nella finestra di dialogo delle regole del ciclo di vita e configura una regola del ciclo di vita di 30 giorni per il tuo bucket. Per informazioni su questa e altre storie, consulta la sezione Risorse correlate. | Amministratore di sistema, sviluppatore |
Crea una policy IAM per il bucket S3. | Apri la console AWS Identity and Access Management (IAM) e scegli Policies. Scegli Create policy (Crea policy), quindi scegli la scheda JSON. Copia e incolla la policy dalla sezione Informazioni aggiuntive nel campo JSON. Questa politica concederà | Amministratore di sistema, sviluppatore |
Assegna la policy a un ruolo IAM. | Apri la console IAM, scegli Ruoli, quindi scegli Crea ruolo. Scegli un altro account AWS come entità affidabile. Inserisci il tuo Account AWS ID e scegli Richiedi un ID esterno. Inserisci un ID segnaposto che potrai modificare in seguito. Scegli Avanti e assegna la policy IAM che hai creato in precedenza. Quindi crea il ruolo IAM. | Amministratore di sistema, sviluppatore |
Copia l'Amazon Resource Name (ARN) per il ruolo IAM. | Apri la console IAM e scegli Ruoli. Scegli il ruolo IAM che hai creato in precedenza, quindi copia e archivia l'ARN del ruolo. | Amministratore di sistema, sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Crea un'integrazione di archiviazione in Snowflake. | Accedi a Snowflake ed esegui il comando. | Amministratore di sistema, sviluppatore |
Recupera il ruolo IAM per il tuo account Snowflake. | Esegui il Importante
| Amministratore di sistema, sviluppatore |
Registra due valori di colonna. | Copia e salva i valori per le | Amministratore di sistema, sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Modifica la politica dei ruoli IAM. | Apri la console IAM e scegli Ruoli. Scegli il ruolo IAM che hai creato in precedenza e scegli la scheda Relazioni di fiducia. Seleziona Modifica relazione di attendibilità. | Amministratore di sistema, sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Attiva le notifiche degli eventi per il bucket S3. | Apri la console Amazon S3 e scegli il tuo bucket. Scegli Proprietà e, in Impostazioni avanzate, scegli Eventi. Scegli Aggiungi notifica e inserisci un nome per questo evento. Se non inserisci un nome, verrà utilizzato un identificatore univoco globale (GUID). | Amministratore di sistema, sviluppatore |
Configura le notifiche Amazon SNS per il bucket S3. | In Eventi, scegli ObjectCreate (Tutti), quindi scegli SQS Queue nell'elenco a discesa Invia a. Nell'elenco SNS, scegli Aggiungi ARN della coda SQS e incolla | Amministratore di sistema, sviluppatore |
Sottoscrivi la coda Snowflake SQS all'argomento SNS. | Sottoscrivi la coda Snowflake SQS all'argomento SNS che hai creato. Per informazioni su questo passaggio, consulta la sezione Risorse correlate. | Amministratore di sistema, sviluppatore |
Attività | Descrizione | Competenze richieste |
---|---|---|
Controlla e prova Snowpipe. | Accedi a Snowflake e apri il livello Snowflake. Trascina i file nel tuo bucket S3 e controlla se la tabella Snowflake li carica. Amazon S3 invierà notifiche SNS a Snowpipe quando vengono visualizzati nuovi oggetti nel bucket S3. | Amministratore di sistema, sviluppatore |
Risorse correlate
Informazioni aggiuntive
Crea un formato di file:
CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;
Crea una fase esterna:
externalStageParams (for Amazon S3) ::=
URL = 's3://[//]'
[ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ]
[ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] |
[ TYPE = 'AWS_SSE_S3' ] |
[ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] |
[ TYPE = NONE ] )
Crea una tabella:
CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ]
<table_name>
( <col_name> <col_type> [ { DEFAULT <expr>
| { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ]
/* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */
[ inlineConstraint ]
[ , <col_name> <col_type> ... ]
[ , outoflineConstraint ]
[ , ... ] )
[ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ]
[ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>'
| TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ]
[ STAGE_COPY_OPTIONS = ( copyOptions ) ]
[ DATA_RETENTION_TIME_IN_DAYS = <num> ]
[ COPY GRANTS ]
[ COMMENT = '<string_literal>' ]
Mostra fasi:
SHOW STAGES;
Crea una pipa:
CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ]
[ AUTO_INGEST = [ TRUE | FALSE ] ]
[ AWS_SNS_TOPIC = ]
[ INTEGRATION = '' ]
[ COMMENT = '' ]
AS
Mostra tubi:
SHOW PIPES [ LIKE '<pattern>' ]
[ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]
Crea un'integrazione di archiviazione:
CREATE STORAGE INTEGRATION <integration_name>
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = '<iam_role>'
STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')
[ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]
Esempio:
create storage integration s3_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'
storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/')
storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');
Per ulteriori informazioni su questo passaggio, consulta Configurazione di un'integrazione di storage Snowflake per accedere ad Amazon S3 dalla documentazione di Snowflake
Descrivi un'integrazione:
DESC INTEGRATION <integration_name>;
Politica sui bucket S3:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectVersion",
"s3:DeleteObject",
"s3:DeleteObjectVersion"
],
"Resource": "arn:aws:s3::://*"
},
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::",
"Condition": {
"StringLike": {
"s3:prefix": [
"/*"
]
}
}
}
]
}