Accelerating crawls using Amazon S3 event notifications - AWS Glue

Accelerating crawls using Amazon S3 event notifications

Instead of listing the objects from an Amazon S3 or Data Catalog target, you can configure the crawler to use Amazon S3 events to find any changes. This feature improves the recrawl time by using Amazon S3 events to identify the changes between two crawls by listing all the files from the subfolder which triggered the event instead of listing the full Amazon S3 or Data Catalog target.

The first crawl lists all Amazon S3 objects from the target. After the first successful crawl, you can choose to recrawl manually or on a set schedule. The crawler will list only the objects from those events instead of listing all objects.

The advantages of moving to an Amazon S3 event based crawler are:

  • A faster recrawl as the listing of all the objects from the target is not required, instead the listing of specific folders is done where objects are added or deleted.

  • A reduction in the overall crawl cost as the listing of specific folders is done where objects are added or deleted.

The Amazon S3 event crawl runs by consuming Amazon S3 events from the SQS queue based on the crawler schedule. There will be no cost if there are no events in the queue. Amazon S3 events can be configured to go directly to the SQS queue or in cases where multiple consumers need the same event, a combination of SNS and SQS. For more information, see Setting up your Account for Amazon S3 event notifications.

After creating and configuring the crawler in event mode, the first crawl runs in listing mode by performing full a listing of the Amazon S3 or Data Catalog target. The following log confirms the operation of the crawl by consuming Amazon S3 events after the first successful crawl: "The crawl is running by consuming Amazon S3 events."

After creating the Amazon S3 event crawl and updating the crawler properties which may impact the crawl, the crawl operates in list mode and the following log is added: "Crawl is not running in S3 event mode".

Catalog target

When the target is the Data Catalog the crawler updates the existing tables in the Data Catalog with changes (for example, extra partitions in a table).

Setting up your Account for Amazon S3 event notifications

This section describes how to set up your account for Amazon S3 event notifications, and provides instructions for doing so using a script, or the AWS Glue console.

Prerequisites

Complete the following setup tasks. Note the values in parenthesis reference the configurable settings from the script.

  1. Create an Amazon S3 bucket (s3_bucket_name).

  2. Identify a crawler target (folder_name, such as "test1") which is a path in the identified bucket.

  3. Prepare a crawler name (crawler_name)

  4. Prepare an SNS Topic name (sns_topic_name) which could be the same as the crawler name.

  5. Prepare the AWS Region where the crawler is to run and the S3 bucket exists (region).

  6. Optionally prepare an email address if email is used to get the Amazon S3 events (subscribing_email).

You can also use the CloudFormation stack to create your resources. Complete the following steps:

  1. Launch your CloudFormation stack in US East (N. Virginia):

  2. Under Parameters, enter a name for your Amazon S3 bucket (include your account number).

  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.

  4. Choose Create stack.

Limitations:

  • Only a single target is supported by the crawler, whether for Amazon S3 or Data Catalog targets.

  • SQS on private VPC is not supported.

  • Amazon S3 sampling is not supported.

  • The crawler target should be a folder for an Amazon S3 target, or one or more AWS Glue Data Catalog tables for a Data Catalog target.

  • The 'everything' path wildcard is not supported: s3://%

  • For a Data Catalog target, all catalog tables should point to same Amazon S3 bucket for Amazon S3 event mode.

  • For a Data Catalog target, a catalog table should not point to an Amazon S3 location in the Delta Lake format (containing _symlink folders, or checking the catalog table's InputFormat).

To use the Amazon S3 event based crawler, you should enable event notification on the S3 bucket with events filtered from the prefix which is the same as the S3 target and store in SQS. You can set up SQS and event notification through the console by following the steps in Walkthrough: Configuring a bucket for notifications or using the Script to generate SQS and configure Amazon S3 events from the target.

SQS policy

Add the following SQS policy which is required to be attached to the role used by the crawler.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "arn:aws:sqs:{region}:{accountID}:cfn-sqs-queue" } ] }

Script to generate SQS and configure Amazon S3 events from the target

After ensuring the prerequisites are met, you can run the following Python script to create the SQS. Replace the Configurable settings with the names prepared from the prerequisites.

Note

After running the script, login to the SQS console to find the ARN of the SQS created.

Amazon SQS sets a visibility timeout, a period of time during which Amazon SQS prevents other consumers from receiving and processing the message. Set the visibility timeout approximately equal to the crawl run time.

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

Setting up a crawler for Amazon S3 event notifications using the console (Amazon S3 target)

To set up a crawler for Amazon S3 event notifications using the AWS Glue console for an Amazon S3 target:

  1. Set your crawler properties. For more information, see Setting Crawler Configuration Options on the AWS Glue console .

  2. In the section Data source configuration, you are asked Is your data already mapped to AWS Glue tables?

    By default Not yet is already selected. Leave this as the default as you are using an Amazon S3 data source and the data is not already mapped to AWS Glue tables.

  3. In the section Data sources, choose Add a data source.

  4. In the Add data source modal, configure the Amazon S3 data source:

    • Data source: By default, Amazon S3 is selected.

    • Network connection (Optional): Choose Add new connection.

    • Location of Amazon S3 data: By default, In this account is selected.

    • Amazon S3 path: Specify the Amazon S3 path where folders and files are crawled.

    • Subsequent crawler runs: Choose Crawl based on events to use Amazon S3 event notifications for your crawler.

    • Include SQS ARN: Specify the data store parameters including the a valid SQS ARN. (For example, arn:aws:sqs:region:account:sqs).

    • Include dead-letter SQS ARN (Optional): Specify a valid Amazon dead-letter SQS ARN. (For example, arn:aws:sqs:region:account:deadLetterQueue).

    • Choose Add an Amazon S3 data source.

Setting up a crawler for Amazon S3 event notifications using the AWS CLI

The following is an example Amazon S3 AWS CLI call to create SQS queues and setup event notifications on Amazon S3 target bucket.

S3 Event AWS CLI aws sqs create-queue --queue-name MyQueue --attributes file://create-queue.json create-queue.json ``` { "Policy": { "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": [ "SQS:SendMessage" ], "Resource": "SQS-queue-ARN", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1" }, "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" } } } ] } } ``` aws s3api put-bucket-notification-configuration --bucket customer-data-pdx --notification-configuration file://s3-event-config.json s3-event-config.json ``` { "QueueConfigurations": [ { "Id": "s3event-sqs-queue", "QueueArn": "arn:aws:sqs:{region}:{account}:queuename", "Events": [ "s3:ObjectCreated:*", "s3:ObjectRemoved:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "/json" } ] } } } ] } ``` Create Crawler:

Setting up a crawler for Amazon S3 event notifications using the console (Data Catalog target)

When you have a catalog target, set up a crawler for Amazon S3 event notifications using the AWS Glue console:

  1. Set your crawler properties. For more information, see Setting Crawler Configuration Options on the AWS Glue console .

  2. In the section Data source configuration, you are asked Is your data already mapped to AWS Glue tables?

    Select Yes to select existing tables from your Data Catalog as your data source.

  3. In the section Glue tables, choose Add tables.

  4. In the Add table modal, configure the database and tables:

    • Network connection (Optional): Choose Add new connection.

    • Database: Select a database in the Data Catalog.

    • Tables: Select one or more tables from that database in the Data Catalog.

    • Subsequent crawler runs: Choose Crawl based on events to use Amazon S3 event notifications for your crawler.

    • Include SQS ARN: Specify the data store parameters including the a valid SQS ARN. (For example, arn:aws:sqs:region:account:sqs).

    • Include dead-letter SQS ARN (Optional): Specify a valid Amazon dead-letter SQS ARN. (For example, arn:aws:sqs:region:account:deadLetterQueue).

    • Choose Confirm.