Automatice la ingesta de flujos de datos en una base de datos de Snowflake mediante Snowflake Snowpipe, Amazon S3, Amazon y Amazon SNS Data Firehose - Recomendaciones de AWS

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Automatice la ingesta de flujos de datos en una base de datos de Snowflake mediante Snowflake Snowpipe, Amazon S3, Amazon y Amazon SNS Data Firehose

Creado por Bikash Chandra Rout () AWS

Entorno: PoC o piloto

Tecnologías: almacenamiento y copia de seguridad

Resumen

Este patrón describe cómo puede utilizar los servicios de la nube de Amazon Web Services (AWS) para procesar un flujo continuo de datos y cargarlo en una base de datos de Snowflake. El patrón utiliza Amazon Data Firehose para enviar los datos a Amazon Simple Storage Service (Amazon S3), Amazon Simple Notification Service (SNSAmazon) para enviar notificaciones cuando se reciben nuevos datos y Snowflake Snowpipe para cargar los datos en una base de datos de Snowflake.

Si sigue este patrón, podrá disponer de los datos generados de forma continua para analizarlos en cuestión de segundos, evitar tener que ejecutar varios COPY comandos manuales y disponer de total compatibilidad con los datos semiestructurados al cargarse.

Requisitos previos y limitaciones

Requisitos previos 

  • Un activo Cuenta de AWS.

  • Una fuente de datos que envía datos de forma continua a un flujo de entrega de Firehose.

  • Un depósito de S3 existente que recibe los datos del flujo de entrega de Firehose.

  • Una cuenta de Snowflake activa.

Limitaciones

  • Snowflake Snowpipe no se conecta directamente a Firehose.

Arquitectura

Los datos ingeridos por Firehose van a Amazon S3, SNS Amazon, Snowflake Snowpipe y Snowflake DB.

Pila de tecnología

  • Amazon Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowpipe

  • Base de datos de Snowflake

Herramientas

  • Amazon Data Firehose es un servicio totalmente gestionado para entregar datos de streaming en tiempo real a destinos como Amazon S3, Amazon Redshift, OpenSearch Amazon Service, Splunk y HTTP cualquier HTTP punto final personalizado propiedad de proveedores de servicios externos compatibles.

  • Amazon Simple Storage Service (Amazon S3) es almacenamiento para Internet.

  • Amazon Simple Notification Service (AmazonSNS) coordina y gestiona la entrega o el envío de mensajes a los puntos de conexión o clientes suscritos.

  • Snowflake: Snowflake es un almacén de datos analíticos que se proporciona como oftware-as-a S-Service (SaaS).

  • Snowflake Snowpipe: Snowpipe carga los datos de los archivos tan pronto como están disponibles en una etapa de Snowflake.

Epics

TareaDescripciónHabilidades requeridas

Cree un archivo en CSV Snowflake.

Inicie sesión en Snowflake y ejecute el CREATE FILE FORMAT comando para crear un CSV archivo con un delimitador de campo específico. Para obtener más información sobre este y otros comandos de Snowflake, consulte la sección de información adicional.

Desarrollador

Cree una etapa de Snowflake externa.

Ejecute el CREATE STAGE comando para crear una etapa Snowflake externa que haga referencia al CSV archivo que creó anteriormente. Importante: Necesitará la clave de acceso y la clave de AWS acceso AWS secreta URL para el bucket de S3. Ejecute el SHOW STAGES comando para comprobar que se ha creado la etapa Snowflake.

Desarrollador

Cree la tabla de destino de Snowflake.

Ejecute el CREATE TABLE comando para crear la tabla Snowflake.

Desarrollador

Cree una canalización.

Ejecute el CREATE PIPE comando; asegúrese de que auto_ingest=true está en el comando. Ejecute el SHOW PIPES comando para comprobar que se ha creado la tubería. Copie y guarde el valor notification_channel de la columna. Este valor se utilizará para configurar las notificaciones de eventos de Amazon S3.

Desarrollador
TareaDescripciónHabilidades requeridas

Cree una política de ciclo de vida de 30 días para el bucket de S3.

Inicie sesión en la consola Amazon S3 AWS Management Console y ábrala. Elige el depósito S3 que contiene los datos de Firehose. A continuación, elija la pestaña Administración en el depósito de S3 y elija Agregar regla de ciclo de vida. Escriba un nombre para la regla en el cuadro de diálogo Regla de ciclo de vida y configure una regla del ciclo de vida de 30 días para su bucket. Para obtener ayuda con esta y otras explicaciones, consulte la sección Recursos relacionados.

Administrador de sistemas, desarrollador

Cree una IAM política para el bucket de S3.

Abre la consola AWS Identity and Access Management (IAM) y selecciona Políticas. Elija Crear política y elija la JSONpestaña. Copia y pega la política de la sección de información adicional en el JSON campo. Esta política PutObject concederá DeleteObject permisos, así como GetObjectGetObjectVersion, y ListBucket permisos. Selecciona Revisar política, introduce un nombre de política y, a continuación, selecciona Crear política.

Administrador de sistemas, desarrollador

Asigne la política a un IAM rol.

Abra la IAM consola, elija Roles y, a continuación, elija Crear rol. Elige Otra AWS cuenta como entidad de confianza. Introduce tu Cuenta de AWS ID y selecciona Requerir ID externo. Introduzca un marcador de posición de ID que podrá cambiar más adelante. Seleccione Siguiente y asigne la IAM política que creó anteriormente. A continuación, cree el IAM rol.

Administrador de sistemas, desarrollador

Copia el nombre del recurso de Amazon (ARN) para el IAM rol.

Abre la IAM consola y selecciona Roles. Elija el IAM rol que creó anteriormente y, a continuación, copie y almacene el rol ARN.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Cree una integración de almacenamiento en Snowflake.

Inicie sesión en Snowflake y ejecute el CREATE STORAGE INTEGRATION comando. Esto modificará la relación de confianza, concederá acceso a Snowflake y proporcionará el ID externo de su etapa de Snowflake.

Administrador de sistemas, desarrollador

Recupera el IAM rol de tu cuenta de Snowflake.

Ejecute el DESC INTEGRATION comando para recuperar el correspondiente al ARN IAM rol.

Importante: <integration_ name> es el nombre de la integración de almacenamiento de Snowflake que creó anteriormente.

Administrador de sistemas, desarrollador

Registre los valores de dos columnas.

Copie y guarde los valores de las columnas storage_aws_iam_user_arn ystorage_aws_external_id.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Modifique la política de IAM roles.

Abre la IAM consola y selecciona Roles. Elija el IAM rol que creó anteriormente y elija la pestaña Relaciones de confianza. Elija Editar relación de confianza. snowflake_external_idReemplácelo por el storage_aws_external_id valor que copió anteriormente. snowflake_user_arnSustitúyalo por el storage_aws_iam_user_arn valor que copió anteriormente. A continuación, selecciona Actualizar política de confianza.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Active las notificaciones de eventos para el bucket de S3.

Abra la consola de Amazon S3 y elija un bucket. Selecciona Propiedades y, en Configuración avanzada, selecciona Eventos. Seleccione Añadir notificación e introduzca un nombre para este evento. Si no ingresas un nombre, se usará un identificador único global (GUID).

Administrador de sistemas, desarrollador

Configura SNS las notificaciones de Amazon para el bucket de S3.

En Eventos, selecciona ObjectCreate (Todos) y, a continuación, selecciona SQSQueue en la lista desplegable Enviar a. En la SNSlista, selecciona Añadir SQS cola ARN y pega el notification_channel valor que copiaste anteriormente. A continuación, elija Guardar.

Administrador de sistemas, desarrollador

Suscribe la SQS cola de Snowflake al tema. SNS

Suscríbete a la lista de Snowflake al tema que SQS has creado. SNS Para obtener ayuda con este paso, consulta la sección de recursos relacionados.

Administrador de sistemas, desarrollador
TareaDescripciónHabilidades requeridas

Revise y pruebe Snowpipe.

Inicie sesión en Snowflake y abra la etapa de Snowflake. Coloque los archivos en su bucket de S3 y compruebe si la tabla Snowflake los carga. Amazon S3 enviará SNS notificaciones a Snowpipe cuando aparezcan nuevos objetos en el bucket de S3.

Administrador de sistemas, desarrollador

Recursos relacionados

Información adicional

Crear un formato de archivo:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

Crear una etapa externa:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

Creación de una tabla:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

Mostrar etapas:

SHOW STAGES;

Creación de una canalización:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

Mostrar canalizaciones:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

Crear una integración de almacenamiento:

CREATE STORAGE INTEGRATION <integration_name> TYPE = EXTERNAL_STAGE STORAGE_PROVIDER = S3 ENABLED = TRUE STORAGE_AWS_ROLE_ARN = '<iam_role>' STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

Ejemplo:

create storage integration s3_int type = external_stage storage_provider = s3 enabled = true storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole' storage_allowed_locations = ('s3://amzn-s3-demo-bucket1/mypath1/', 's3://amzn-s3-demo-bucket2/mypath2/') storage_blocked_locations = ('s3://amzn-s3-demo-bucket1/mypath1/sensitivedata/', 's3://amzn-s3-demo-bucket2/mypath2/sensitivedata/');

Para obtener más información sobre este paso, consulte Configuración de una integración de almacenamiento de Snowflake para acceder a Amazon S3 en la documentación de Snowflake.

Describir una integración:

DESC INTEGRATION <integration_name>;

Política de bucket de S3:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }