Aceleración de los rastreadores mediante las notificaciones de eventos de Amazon S3 - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Aceleración de los rastreadores mediante las notificaciones de eventos de Amazon S3

En lugar de publicar los objetos de un destino de Amazon S3 o del Catálogo de datos, puede configurar el rastreador para que utilice eventos de Amazon S3 para buscar cualquier cambio. Esta característica mejora el tiempo de rastreo mediante el uso de eventos de Amazon S3 o del Catálogo de datos para identificar los cambios entre dos rastreos al enumerar todos los archivos de la subcarpeta que activó el evento en lugar de publicar el destino completo de Amazon S3 o el Catálogo de datos.

En el primer rastreo se enumeran todos los objetos de Amazon S3 del destino. Después del primer rastreo exitoso, puede optar por volver a rastrear manualmente o según un calendario establecido. El rastreador enumerará solo los objetos de esos eventos en lugar de enumerar todos los objetos.

Las ventajas de pasar a un rastreador basado en eventos de Amazon S3 son:

  • Un nuevo rastreo más rápido, ya que la lista de todos los objetos del destino no es necesaria, sino que la lista de carpetas específicas se realiza cuando se agregan o eliminan objetos.

  • Reducción del costo global de rastreo a medida que la lista de carpetas específicas se realiza en las que se agregan o eliminan objetos.

El rastreo de eventos de Amazon S3 se ejecuta al consumir eventos de Amazon S3 desde la cola de SQS según la programación del rastreador. No habrá ningún costo si no hay eventos en la cola. Los eventos de Amazon S3 se pueden configurar para que vayan directamente a la cola de SQS o en los casos en que varios consumidores necesitan el mismo evento, una combinación de SNS y SQS. Para obtener más información, consulte Configuración de la cuenta para las notificaciones de eventos de Amazon S3.

Después de crear y configurar el rastreador en modo evento, el primer rastreo se ejecuta en modo listado y enumera un listado completo del destino de Amazon S3 o del Catálogo de datos. A través del siguiente registro se confirma el funcionamiento del rastreo mediante el uso de eventos de Amazon S3 tras el primer rastreo correcto: “El rastreo se ejecuta mediante el uso de eventos de Amazon S3”.

Después de crear el rastreo de eventos de Amazon S3 y actualizar las propiedades del rastreador que pueden afectar al rastreo, el rastreo funciona en modo lista y se agrega el siguiente registro: “El rastreo no se ejecuta en modo de evento de S3”.

Catálogo de datos como destino

Cuando el destino es el Catálogo de datos, el rastreador actualiza las tablas existentes en el Catálogo de datos con los cambios (por ejemplo, particiones adicionales en una tabla).

Configuración de la cuenta para las notificaciones de eventos de Amazon S3

En esta sección, se describe cómo configurar la cuenta para las notificaciones de eventos de Amazon S3 y se proporcionan instrucciones para hacerlo mediante un script o una consola de AWS Glue.

Requisitos previos

Realice los siguientes pasos de configuración. Tenga en cuenta que los valores entre paréntesis hacen referencia a los valores configurables del script.

  1. Cree un bucket de Amazon S3 (s3_bucket_name)

  2. Identificar un objetivo de rastreador (folder_name, como “test1") que es una ruta en el bucket identificado.

  3. Preparar un nombre de rastreador (crawler_name)

  4. Prepare un nombre de tema de SNS (sns_topic_name) que podría ser el mismo que el nombre del rastreador.

  5. Prepare la región de AWS en la que se va a ejecutar el rastreador y donde existe el bucket de S3 (region).

  6. Si lo desea, prepare una dirección de correo electrónico si se utiliza el correo electrónico para obtener los eventos de Amazon S3 (subscribing_email).

También puede usar la pila CloudFormation para crear sus recursos. Realice los pasos siguientes:

  1. Lance su pila CloudFormation en el Este de EE. UU. (Norte de Virginia):

  2. En Parámetros, ingrese un nombre para su bucket de Amazon S3 (incluye su número de cuenta).

  3. Seleccionar I acknowledge that AWS CloudFormation might create IAM resources with custom names.

  4. Elija Create stack.

Limitaciones:

  • El rastreador admite un solo destino, ya sean destinos para Amazon S3 o para el Catálogo de datos.

  • No es posible utilizar SQS en una VPC privada.

  • No se admite el muestreo de Amazon S3.

  • El destino del rastreador debe ser una carpeta para un destino de Amazon S3 o una o más tablas del Catálogo de datos de AWS Glue para un destino del Catálogo de datos.

  • No se admite el comodín de la ruta “todo”: s3://%

  • Para un destino de Catálogo de datos, todas las tablas del Catálogo deben apuntar al mismo bucket de Amazon S3 para el modo de eventos de Amazon S3.

  • Para un destino de Catálogo de datos, una tabla de catálogo no debe apuntar a una ubicación de Amazon S3 en formato Delta Lake (que contenga carpetas _symlink o consulte las tablas del catálogo InputFormat).

Para utilizar el rastreador basado en eventos de Amazon S3, debe habilitar la notificación de eventos en el bucket de S3 con eventos filtrados del prefijo, que es el mismo que el destino de S3 y el almacenamiento en SQS. Puede configurar SQS y la notificación de eventos a través de la consola siguiendo los pasos del Tutorial: configuración de un bucket para notificaciones o mediante Script para generar SQS y configurar eventos de Amazon S3 desde el destino.

Política de SQS

Agregue la siguiente política de SQS que debe adjuntarse al rol utilizado por el rastreador.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "arn:aws:sqs:{region}:{accountID}:cfn-sqs-queue" } ] }

Script para generar SQS y configurar eventos de Amazon S3 desde el destino

Una vez garantizado que se cumplen los requisitos previos, puede ejecutar el siguiente script de Python para crear el SQS. Sustituya la configuración configurable por los nombres preparados a partir de los requisitos previos.

nota

Después de ejecutar el script, inicie sesión en la consola SQS para buscar el ARN del SQS creado.

Amazon SQS establece un tiempo de espera de visibilidad, un periodo durante el cual Amazon SQS impide que otros consumidores reciban y procesen el mensaje. Establezca el tiempo de espera de visibilidad aproximadamente igual al tiempo de ejecución de rastreo.

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

Configuración de un rastreador para notificaciones de eventos de Amazon S3 mediante la consola (Amazon S3 como destino)

Para configurar un rastreador para las notificaciones de eventos de Amazon S3 mediante la consola de AWS Glue para un destino de Amazon S3:

  1. Configure las propiedades del rastreador. Para obtener más información, consulte Opciones de configuración de rastreadores en la consola de AWS Glue.

  2. En la sección Configuración de origen de datos, se preguntará ¿Los datos ya están asignados a tablas de AWS Glue?

    De manera predeterminada, está seleccionado Not yet (Aún no). Déjelo así si está utilizando un origen de datos de Amazon S3 y los datos aún no están asignados a tablas de AWS Glue.

  3. En la sección Data sources (Origen de datos), elija Add a data source (Agregar un origen de datos).

  4. En el modal Add data source (Agregar origen de datos), configure el origen de datos de Amazon S3:

    • Data source (Origen de datos): de manera predeterminada, está seleccionado Amazon S3.

    • Network connection (Conexión de red) (opcional): elija Add new connection (Agregar nueva conexión).

    • Location of Amazon S3 data (Ubicación de datos de Amazon S3): de manera predeterminada, está seleccionado In this account (En esta cuenta).

    • Amazon S3 path (Ruta de Amazon S3): especifique la ruta de Amazon S3 en la que se rastrean carpetas y archivos.

    • Subsequent crawler runs (Ejecuciones posteriores del rastreador): elija Crawl based on events (Rastreo basado en eventos) para utilizar las notificaciones de eventos de Amazon S3 para el rastreador.

    • Include SQS ARN (Incluir ARN de SQS): especifique los parámetros del almacén de datos, incluido un ARN SQS válido. (Por ejemplo, arn:aws:sqs:region:account:sqs).

    • Include dead-letter SQS ARN (Incluir un SQS ARN de mensajes fallidos) (Optional): especifique un ARN de SQS con mensajes erróneos de Amazon válido. (Por ejemplo, arn:aws:sqs:region:account:deadLetterQueue).

    • Elija Add an Amazon S3 data source (Agregar un origen de datos de Amazon S3).

Configuración de un rastreador para notificaciones de eventos de Amazon S3 mediante la AWS CLI

El siguiente es un ejemplo de llamada a la AWS CLI de Amazon S3 para crear colas de SQS y configurar notificaciones de eventos en el bucket de destino de Amazon S3.

S3 Event AWS CLI aws sqs create-queue --queue-name MyQueue --attributes file://create-queue.json create-queue.json ``` { "Policy": { "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": [ "SQS:SendMessage" ], "Resource": "SQS-queue-ARN", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1" }, "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" } } } ] } } ``` aws s3api put-bucket-notification-configuration --bucket customer-data-pdx --notification-configuration file://s3-event-config.json s3-event-config.json ``` { "QueueConfigurations": [ { "Id": "s3event-sqs-queue", "QueueArn": "arn:aws:sqs:{region}:{account}:queuename", "Events": [ "s3:ObjectCreated:*", "s3:ObjectRemoved:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "/json" } ] } } } ] } ``` Create Crawler:

Configuración de un rastreador para notificaciones de eventos de Amazon S3 mediante la consola (el Catálogo de datos como destino)

Cuando el destino sea un catálogo, configure un rastreador para las notificaciones de eventos de Amazon S3 mediante la consola de AWS Glue:

  1. Configure las propiedades del rastreador. Para obtener más información, consulte Opciones de configuración de rastreadores en la consola de AWS Glue.

  2. En la sección Configuración de origen de datos, se preguntará ¿Los datos ya están asignados a tablas de AWS Glue?

    Seleccione Yes (Sí) para seleccionar las tablas existentes de su Catálogo de datos como origen de datos.

  3. En la sección Glue tables (Tablas de Glue), seleccione Add tables (Agregar tablas).

  4. En el modal Add table (Agregar tabla), configure la base de datos y las tablas:

    • Network connection (Conexión de red) (opcional): elija Add new connection (Agregar nueva conexión).

    • Base de datos: seleccione una base de datos en el Catálogo de datos.

    • Tablas: seleccione una o más tablas de esa base de datos en el Catálogo de datos.

    • Subsequent crawler runs (Ejecuciones posteriores del rastreador): elija Crawl based on events (Rastreo basado en eventos) para utilizar las notificaciones de eventos de Amazon S3 para el rastreador.

    • Include SQS ARN (Incluir ARN de SQS): especifique los parámetros del almacén de datos, incluido un ARN SQS válido. (Por ejemplo, arn:aws:sqs:region:account:sqs).

    • Include dead-letter SQS ARN (Incluir un SQS ARN de mensajes fallidos) (Optional): especifique un ARN de SQS con mensajes erróneos de Amazon válido. (Por ejemplo, arn:aws:sqs:region:account:deadLetterQueue).

    • Elija Confirmar.