使用 Amazon S3 事件通知加速网络爬取 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon S3 事件通知加速网络爬取

您可以将爬网程序配置为使用 Amazon S3 事件来查找任何更改,而不是列出 Amazon S3 或 Data Catalog 目标中的对象。此功能使用 Amazon S3 事件,通过列出触发事件的子文件夹中的所有文件,而不是列出完整的 Amazon S3 或 Data Catalog 目标,来识别两次网络爬取之间的更改,从而缩短了重新爬网时间。

第一次网络爬取会列出目标中的所有 Amazon S3 对象。第一次成功爬取之后,您可以选择手动重新爬取或按设定计划重新爬取。爬网程序只会列出这些事件中的对象,而不会列出所有对象。

迁移到基于 Amazon S3 事件的爬网程序的优点包括:

  • 不需要列出目标中的所有对象,而是在添加或删除对象的位置列出特定文件夹,从而重新爬网更快。

  • 在添加或删除对象的位置列出特定文件夹,从而降低总体网络爬取成本。

Amazon S3 事件网络爬取基于爬网程序调度,从 SQS 队列中使用 Amazon S3 事件来运行。如果队列中没有事件,则无需支付费用。Amazon S3 事件可以配置为直接进入 SQS 队列,或者在多个用户需要相同事件的情况下,也可以配置为 SNS 和 SQS 的组合。有关更多信息,请参阅 为 Amazon S3 事件通知设置账户

在事件模式下创建和配置爬网程序之后,第一次网络爬取将在列表模式下通过执行 Amazon S3 或 Data Catalog 目标的完整列表来运行。第一次成功网络爬取后将使用 Amazon S3 事件,以下日志可确认网络爬取的运行:“网络爬取正在使用 Amazon S3 事件来运行。”

创建 Amazon S3 事件网络爬取并更新可能影响网络爬取的爬网程序属性后,网络爬取将在列表模式下运行,并添加以下日志:“网络爬取未在 S3 事件模式下运行”。

注意

每次爬取使用的最大消息数为 10000。

目录目标

当目标为 Data Catalog 时,爬网程序会利用更改(例如,表中的额外分区)更新 Data Catalog 中的现有表。

为 Amazon S3 事件通知设置账户

本节介绍如何为 Amazon S3 事件通知设置账户,并提供使用脚本或 AWS Glue 控制台执行此操作的说明。

先决条件

完成以下设置任务。请注意,括号中的值引用了脚本中的可配置设置。

  1. 创建 Amazon S3 存储桶 (s3_bucket_name)。

  2. 识别爬网程序目标(folder_name,例如“test1”),即识别的存储桶中的路径。

  3. 准备爬网程序名称 (crawler_name

  4. 准备 SNS 主题名称 (sns_topic_name),该名称可能与爬网程序名称相同。

  5. 准备要运行爬网程序且存在 S3 存储桶的 AWS 区域 (region)。

  6. 如果使用电子邮件获取 Amazon S3 事件,则可以选择准备电子邮件地址 (subscribing_email)。

您也可以使用 CloudFormation 堆栈创建您的资源。完成以下步骤:

  1. 在美国东部(弗吉尼亚州北部)启动您的 CloudFormation 堆栈:

  2. 在 Parameters 下方中输入 Amazon S3 存储桶的名称(包括您的账号)。

  3. 选择 I acknowledge that AWS CloudFormation might create IAM resources with custom names

  4. 选择Create stack

限制:

  • 无论是 Amazon S3 还是 Data Catalog 目标,爬网程序仅支持单个目标。

  • 不支持私有 VPC 上的 SQS。

  • 不支持 Amazon S3 采样。

  • 爬网程序目标应为 Amazon S3 目标的文件夹,或者 Data Catalog 目标的一个或多个 AWS Glue Data Catalog 表。

  • 不支持“所有”路径通配符:s3://%

  • 对于 Data Catalog 目标,所有目录表都应指向 Amazon S3 事件模式的同一 Amazon S3 存储桶。

  • 对于 Data Catalog 目标,目录表不应指向 Delta Lake 格式的 Amazon S3 位置(包含 _symlink 文件夹或检查目录表的 InputFormat)。

要使用基于 Amazon S3 事件的爬网程序,您应该在 S3 存储桶上启用事件通知,并使用从与 S3 目标相同的前缀中筛选的事件,并存储在 SQS 中。您可以按照演练:为存储桶配置通知中的步骤或使用 用于从目标生成 SQS 和配置 Amazon S3 事件的脚本 通过控制台设置 SQS 和事件通知。

SQS 策略

添加以下 SQS 策略,需要附上爬网程序使用的角色。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "arn:aws:sqs:{region}:{accountID}:cfn-sqs-queue" } ] }

用于从目标生成 SQS 和配置 Amazon S3 事件的脚本

确保满足先决条件后,可以运行以下 Python 脚本来创建 SQS。将可配置设置替换为根据先决条件准备的名称。

注意

运行脚本后,登录到 SQS 控制台以查找创建的 SQS 的 ARN。

Amazon SQS 会设置可见性超时,即 Amazon SQS 阻止其他用户接收并处理消息的一段时间。将可见性超时设置为大致等于网络爬取运行时。

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

使用控制台为 Amazon S3 事件通知设置爬网程序(Amazon S3 目标)

要针对 Amazon S3 目标使用 AWS Glue 控制台为 Amazon S3 事件通知设置爬网程序,请执行以下操作:

  1. 设置爬网程序属性。有关更多信息,请参阅在 AWS Glue 控制台上设置爬网程序配置选项

  2. 数据来源配置部分中,系统将询问您的数据是否已映射到 AWS Glue 表?

    默认情况下已选择 Not yet(尚未)。请将其保留为默认值,这是因为您使用的是 Amazon S3 数据来源,而该数据尚未映射到 AWS Glue 表。

  3. Data sources(数据来源)部分中,选择 Add a data source(添加数据来源)。

  4. Add data source(添加数据来源)模态中,配置 Amazon S3 数据来源:

    • Data source(数据来源):默认选择 Amazon S3。

    • Network connection(网络连接)(可选):选择 Add new connection(添加新连接)。

    • Location of Amazon S3 data(Amazon S3 数据位置):默认选择 In this account(此账户中)。

    • Amazon S3 path(Amazon S3 路径):指定在其中爬取文件夹和文件的 Amazon S3 路径。

    • Subsequent crawler runs(后续爬网程序运行):选择 Crawl based on events(基于事件爬取)以对爬网程序使用 Amazon S3 事件通知。

    • Include SQS ARN(包含 SQS ARN):指定数据存储参数,包括有效的 SQS ARN。(例如,arn:aws:sqs:region:account:sqs)。

    • Include dead-letter SQS ARN(包含死信 SQS ARN)(可选):指定有效的 Amazon 死信 SQS ARN。(例如,arn:aws:sqs:region:account:deadLetterQueue)。

    • 选择 Add an Amazon S3 data source(添加 Amazon S3 数据来源)。

使用 AWS CLI 为 Amazon S3 事件通知设置爬网程序

以下是在 Amazon S3 目标存储桶上创建 SQS 队列和设置事件通知的 Amazon S3 AWS CLI 调用示例。

S3 Event AWS CLI aws sqs create-queue --queue-name MyQueue --attributes file://create-queue.json create-queue.json ``` { "Policy": { "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": [ "SQS:SendMessage" ], "Resource": "SQS-queue-ARN", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1" }, "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" } } } ] } } ``` aws s3api put-bucket-notification-configuration --bucket customer-data-pdx --notification-configuration file://s3-event-config.json s3-event-config.json ``` { "QueueConfigurations": [ { "Id": "s3event-sqs-queue", "QueueArn": "arn:aws:sqs:{region}:{account}:queuename", "Events": [ "s3:ObjectCreated:*", "s3:ObjectRemoved:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "/json" } ] } } } ] } ``` Create Crawler:

使用控制台为 Amazon S3 事件通知设置爬网程序(Data Catalog 目标)

当您有目录目标时,请使用 AWS Glue 控制台为 Amazon S3 事件通知设置爬网程序:

  1. 设置爬网程序属性。有关更多信息,请参阅在 AWS Glue 控制台上设置爬网程序配置选项

  2. 数据来源配置部分中,系统将询问您的数据是否已映射到 AWS Glue 表?

    选择 Yes(是),从 Data Catalog 中选择现有表作为数据来源。

  3. Glue tables(Glue 表)部分中,选择 Add tables(添加表)。

  4. Add table(添加表)模式中,配置数据库和表:

    • Network connection(网络连接)(可选):选择 Add new connection(添加新连接)。

    • Database(数据库):在 Data Catalog 中选择数据库。

    • Tables(表):在 Data Catalog 中选择该数据库中的一个或多个表。

    • Subsequent crawler runs(后续爬网程序运行):选择 Crawl based on events(基于事件爬取)以对爬网程序使用 Amazon S3 事件通知。

    • Include SQS ARN(包含 SQS ARN):指定数据存储参数,包括有效的 SQS ARN。(例如,arn:aws:sqs:region:account:sqs)。

    • Include dead-letter SQS ARN(包含死信 SQS ARN)(可选):指定有效的 Amazon 死信 SQS ARN。(例如,arn:aws:sqs:region:account:deadLetterQueue)。

    • 选择确认