Mempercepat crawl menggunakan notifikasi acara Amazon S3 - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mempercepat crawl menggunakan notifikasi acara Amazon S3

Alih-alih mencantumkan objek dari target Amazon S3 atau Katalog Data, Anda dapat mengonfigurasi crawler untuk menggunakan peristiwa Amazon S3 untuk menemukan perubahan apa pun. Fitur ini meningkatkan waktu rawl ulang dengan menggunakan peristiwa Amazon S3 untuk mengidentifikasi perubahan antara dua crawl dengan mencantumkan semua file dari subfolder yang memicu peristiwa alih-alih mencantumkan target Amazon S3 atau Katalog Data lengkap.

Crawl pertama mencantumkan semua objek Amazon S3 dari target. Setelah crawl pertama berhasil, Anda dapat memilih untuk meng-rawl ulang secara manual atau pada jadwal yang ditetapkan. Crawler hanya akan mencantumkan objek dari peristiwa tersebut alih-alih mencantumkan semua objek.

Keuntungan pindah ke crawler berbasis acara Amazon S3 adalah:

  • Recrawl lebih cepat karena daftar semua objek dari target tidak diperlukan, alih-alih daftar folder tertentu dilakukan di mana objek ditambahkan atau dihapus.

  • Pengurangan biaya crawl keseluruhan karena daftar folder tertentu dilakukan di mana objek ditambahkan atau dihapus.

Crawl peristiwa Amazon S3 berjalan dengan menggunakan peristiwa Amazon S3 dari antrean SQS berdasarkan jadwal crawler. Tidak akan ada biaya jika tidak ada acara dalam antrian. Acara Amazon S3 dapat dikonfigurasi untuk langsung masuk ke antrian SQS atau dalam kasus di mana beberapa konsumen memerlukan acara yang sama, kombinasi SNS dan SQS. Untuk informasi selengkapnya, lihat Menyiapkan Akun Anda untuk pemberitahuan acara Amazon S3.

Setelah membuat dan mengonfigurasi crawler dalam mode peristiwa, crawl pertama berjalan dalam mode daftar dengan melakukan daftar lengkap target Amazon S3 atau Katalog Data. Log berikut mengonfirmasi pengoperasian crawl dengan menggunakan peristiwa Amazon S3 setelah perayapan pertama yang berhasil: “Perayapan berjalan dengan menggunakan peristiwa Amazon S3.”

Setelah membuat crawl peristiwa Amazon S3 dan memperbarui properti crawler yang dapat memengaruhi perayapan, crawl beroperasi dalam mode daftar dan log berikut ditambahkan: “Perayapan tidak berjalan dalam mode peristiwa S3”.

Target katalog

Ketika targetnya adalah Katalog Data, crawler memperbarui tabel yang ada di Katalog Data dengan perubahan (misalnya, partisi tambahan dalam tabel).

Menyiapkan Akun Anda untuk pemberitahuan acara Amazon S3

Bagian ini menjelaskan cara mengatur akun Anda untuk pemberitahuan peristiwa Amazon S3, dan memberikan instruksi untuk melakukannya menggunakan skrip, atau konsol. AWS Glue

Prasyarat

Selesaikan tugas pengaturan berikut. Perhatikan nilai dalam tanda kurung merujuk pengaturan yang dapat dikonfigurasi dari skrip.

  1. Buat bucket Amazon S3 ()s3_bucket_name.

  2. Identifikasi target crawler (folder_name, seperti “test1") yang merupakan jalur di bucket yang diidentifikasi.

  3. Siapkan nama crawler () crawler_name

  4. Siapkan nama Topik SNS (sns_topic_name) yang bisa sama dengan nama crawler.

  5. Siapkan AWS Wilayah tempat crawler dijalankan dan bucket S3 ada ()region.

  6. Secara opsional siapkan alamat email jika email digunakan untuk mendapatkan acara subscribing_email Amazon S3 ().

Anda juga dapat menggunakan CloudFormation tumpukan untuk membuat sumber daya Anda. Selesaikan langkah-langkah berikut:

  1. Luncurkan CloudFormation tumpukan Anda di AS Timur (Virginia N.):

  2. Di bawah Parameter, masukkan nama untuk bucket Amazon S3 Anda (sertakan nomor akun Anda).

  3. Pilih I acknowledge that AWS CloudFormation might create IAM resources with custom names.

  4. Pilih Create stack.

Pembatasan:

  • Hanya satu target yang didukung oleh crawler, baik untuk target Amazon S3 atau Katalog Data.

  • SQS pada VPC pribadi tidak didukung.

  • Pengambilan sampel Amazon S3 tidak didukung.

  • Target crawler harus berupa folder untuk target Amazon S3, atau satu atau AWS Glue beberapa tabel Katalog Data untuk target Katalog Data.

  • Wildcard jalur 'semuanya' tidak didukung: s3: //%

  • Untuk target Katalog Data, semua tabel katalog harus mengarah ke bucket Amazon S3 yang sama untuk mode acara Amazon S3.

  • Untuk target Katalog Data, tabel katalog tidak boleh mengarah ke lokasi Amazon S3 dalam format Delta Lake (berisi folder _symlink, atau memeriksa tabel katalog). InputFormat

Untuk menggunakan crawler berbasis peristiwa Amazon S3, Anda harus mengaktifkan pemberitahuan peristiwa di bucket S3 dengan peristiwa yang difilter dari awalan yang sama dengan target S3 dan penyimpanan di SQS. Anda dapat mengatur SQS dan pemberitahuan acara melalui konsol dengan mengikuti langkah-langkah di Walkthrough: Mengonfigurasi bucket untuk notifikasi atau menggunakan. Skrip untuk menghasilkan SQS dan mengonfigurasi peristiwa Amazon S3 dari target

Kebijakan SQS

Tambahkan kebijakan SQS berikut yang harus dilampirkan ke peran yang digunakan oleh crawler.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "sqs:DeleteMessage", "sqs:GetQueueUrl", "sqs:ListDeadLetterSourceQueues", "sqs:ReceiveMessage", "sqs:GetQueueAttributes", "sqs:ListQueueTags", "sqs:SetQueueAttributes", "sqs:PurgeQueue" ], "Resource": "arn:aws:sqs:{region}:{accountID}:cfn-sqs-queue" } ] }

Skrip untuk menghasilkan SQS dan mengonfigurasi peristiwa Amazon S3 dari target

Setelah memastikan prasyarat terpenuhi, Anda dapat menjalankan skrip Python berikut untuk membuat SQS. Ganti pengaturan yang Dapat Dikonfigurasi dengan nama yang disiapkan dari prasyarat.

catatan

Setelah menjalankan skrip, masuk ke konsol SQS untuk menemukan ARN dari SQS yang dibuat.

Amazon SQS menetapkan batas waktu visibilitas, periode waktu di mana Amazon SQS mencegah konsumen lain menerima dan memproses pesan. Setel batas waktu visibilitas kira-kira sama dengan waktu proses crawl.

#!venv/bin/python import boto3 import botocore #---------Start : READ ME FIRST ----------------------# # 1. Purpose of this script is to create the SQS, SNS and enable S3 bucket notification. # The following are the operations performed by the scripts: # a. Enable S3 bucket notification to trigger 's3:ObjectCreated:' and 's3:ObjectRemoved:' events. # b. Create SNS topic for fan out. # c. Create SQS queue for saving events which will be consumed by the crawler. # SQS Event Queue ARN will be used to create the crawler after running the script. # 2. This script does not create the crawler. # 3. SNS topic is created to support FAN out of S3 events. If S3 event is also used by another # purpose, SNS topic created by the script can be used. # 1. Creation of bucket is an optional step. # To create a bucket set create_bucket variable to true. # 2. The purpose of crawler_name is to easily locate the SQS/SNS. # crawler_name is used to create SQS and SNS with the same name as crawler. # 3. 'folder_name' is the target of crawl inside the specified bucket 's3_bucket_name' # #---------End : READ ME FIRST ------------------------# #--------------------------------# # Start : Configurable settings # #--------------------------------# #Create region = 'us-west-2' s3_bucket_name = 's3eventtestuswest2' folder_name = "test" crawler_name = "test33S3Event" sns_topic_name = crawler_name sqs_queue_name = sns_topic_name create_bucket = False #-------------------------------# # End : Configurable settings # #-------------------------------# # Define aws clients dev = boto3.session.Session(profile_name='myprofile') boto3.setup_default_session(profile_name='myprofile') s3 = boto3.resource('s3', region_name=region) sns = boto3.client('sns', region_name=region) sqs = boto3.client('sqs', region_name=region) client = boto3.client("sts") account_id = client.get_caller_identity()["Account"] queue_arn = "" def print_error(e): print(e.message + ' RequestId: ' + e.response['ResponseMetadata']['RequestId']) def create_s3_bucket(bucket_name, client): bucket = client.Bucket(bucket_name) try: if not create_bucket: return True response = bucket.create( ACL='private', CreateBucketConfiguration={ 'LocationConstraint': region }, ) return True except botocore.exceptions.ClientError as e: print_error(e) if 'BucketAlreadyOwnedByYou' in e.message: # we own this bucket so continue print('We own the bucket already. Lets continue...') return True return False def create_s3_bucket_folder(bucket_name, client, directory_name): s3.put_object(Bucket=bucket_name, Key=(directory_name + '/')) def set_s3_notification_sns(bucket_name, client, topic_arn): bucket_notification = client.BucketNotification(bucket_name) try: response = bucket_notification.put( NotificationConfiguration={ 'TopicConfigurations': [ { 'Id' : crawler_name, 'TopicArn': topic_arn, 'Events': [ 's3:ObjectCreated:*', 's3:ObjectRemoved:*', ], 'Filter' : {'Key': {'FilterRules': [{'Name': 'prefix', 'Value': folder_name}]}} }, ] } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def create_sns_topic(topic_name, client): try: response = client.create_topic( Name=topic_name ) return response['TopicArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sns_topic_policy(topic_arn, client, bucket_name): try: response = client.set_topic_attributes( TopicArn=topic_arn, AttributeName='Policy', AttributeValue='''{ "Version": "2008-10-17", "Id": "s3-publish-to-sns", "Statement": [{ "Effect": "Allow", "Principal": { "AWS" : "*" }, "Action": [ "SNS:Publish" ], "Resource": "%s", "Condition": { "StringEquals": { "AWS:SourceAccount": "%s" }, "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:%s" } } }] }''' % (topic_arn, account_id, bucket_name) ) return True except botocore.exceptions.ClientError as e: print_error(e) return False def subscribe_to_sns_topic(topic_arn, client, protocol, endpoint): try: response = client.subscribe( TopicArn=topic_arn, Protocol=protocol, Endpoint=endpoint ) return response['SubscriptionArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def create_sqs_queue(queue_name, client): try: response = client.create_queue( QueueName=queue_name, ) return response['QueueUrl'] except botocore.exceptions.ClientError as e: print_error(e) return None def get_sqs_queue_arn(queue_url, client): try: response = client.get_queue_attributes( QueueUrl=queue_url, AttributeNames=[ 'QueueArn', ] ) return response['Attributes']['QueueArn'] except botocore.exceptions.ClientError as e: print_error(e) return None def set_sqs_policy(queue_url, queue_arn, client, topic_arn): try: response = client.set_queue_attributes( QueueUrl=queue_url, Attributes={ 'Policy': '''{ "Version": "2012-10-17", "Id": "AllowSNSPublish", "Statement": [ { "Sid": "AllowSNSPublish01", "Effect": "Allow", "Principal": "*", "Action": "SQS:SendMessage", "Resource": "%s", "Condition": { "ArnEquals": { "aws:SourceArn": "%s" } } } ] }''' % (queue_arn, topic_arn) } ) return True except botocore.exceptions.ClientError as e: print_error(e) return False if __name__ == "__main__": print('Creating S3 bucket %s.' % s3_bucket_name) if create_s3_bucket(s3_bucket_name, s3): print('\nCreating SNS topic %s.' % sns_topic_name) topic_arn = create_sns_topic(sns_topic_name, sns) if topic_arn: print('SNS topic created successfully: %s' % topic_arn) print('Creating SQS queue %s' % sqs_queue_name) queue_url = create_sqs_queue(sqs_queue_name, sqs) if queue_url is not None: print('Subscribing sqs queue with sns.') queue_arn = get_sqs_queue_arn(queue_url, sqs) if queue_arn is not None: if set_sqs_policy(queue_url, queue_arn, sqs, topic_arn): print('Successfully configured queue policy.') subscription_arn = subscribe_to_sns_topic(topic_arn, sns, 'sqs', queue_arn) if subscription_arn is not None: if 'pending confirmation' in subscription_arn: print('Please confirm SNS subscription by visiting the subscribe URL.') else: print('Successfully subscribed SQS queue: ' + queue_arn) else: print('Failed to subscribe SNS') else: print('Failed to set queue policy.') else: print("Failed to get queue arn for %s" % queue_url) # ------------ End subscriptions to SNS topic ----------------- print('\nSetting topic policy to allow s3 bucket %s to publish.' % s3_bucket_name) if set_sns_topic_policy(topic_arn, sns, s3_bucket_name): print('SNS topic policy added successfully.') if set_s3_notification_sns(s3_bucket_name, s3, topic_arn): print('Successfully configured event for S3 bucket %s' % s3_bucket_name) print('Create S3 Event Crawler using SQS ARN %s' % queue_arn) else: print('Failed to configure S3 bucket notification.') else: print('Failed to add SNS topic policy.') else: print('Failed to create SNS topic.')

Menyiapkan crawler untuk notifikasi peristiwa Amazon S3 menggunakan konsol (target Amazon S3)

Untuk menyiapkan crawler untuk notifikasi peristiwa Amazon S3 menggunakan konsol untuk AWS Glue target Amazon S3:

  1. Tetapkan properti crawler Anda. Untuk informasi selengkapnya, lihat Menyetel Opsi Konfigurasi Crawler di AWS Glue konsol.

  2. Di bagian Konfigurasi sumber data, Anda ditanya Apakah data Anda sudah dipetakan ke AWS Glue tabel?

    Secara default Belum dipilih. Biarkan ini sebagai default karena Anda menggunakan sumber data Amazon S3 dan data belum dipetakan ke tabel. AWS Glue

  3. Di bagian Sumber data, pilih Tambahkan sumber data.

  4. Dalam modal Tambah sumber data, konfigurasikan sumber data Amazon S3:

    • Sumber data: Secara default, Amazon S3 dipilih.

    • Koneksi jaringan (Opsional): Pilih Tambahkan koneksi baru.

    • Lokasi data Amazon S3: Secara default, Di akun ini dipilih.

    • Jalur Amazon S3: Tentukan jalur Amazon S3 tempat folder dan file dirayapi.

    • Perayap berikutnya berjalan: Pilih Crawl berdasarkan peristiwa untuk menggunakan notifikasi peristiwa Amazon S3 untuk crawler Anda.

    • Sertakan SQS ARN: Tentukan parameter penyimpanan data termasuk SQS ARN yang valid. (Misalnya,arn:aws:sqs:region:account:sqs).

    • Sertakan SQS ARN huruf mati (Opsional): Tentukan SQS ARN huruf mati Amazon yang valid. (Misalnya,arn:aws:sqs:region:account:deadLetterQueue).

    • Pilih Tambahkan sumber data Amazon S3.

Menyiapkan crawler untuk notifikasi peristiwa Amazon S3 menggunakan AWS CLI

Berikut ini adalah contoh AWS CLI panggilan Amazon S3 untuk membuat antrian SQS dan pemberitahuan acara penyiapan di bucket target Amazon S3.

S3 Event AWS CLI aws sqs create-queue --queue-name MyQueue --attributes file://create-queue.json create-queue.json ``` { "Policy": { "Version": "2012-10-17", "Id": "example-ID", "Statement": [ { "Sid": "example-statement-ID", "Effect": "Allow", "Principal": { "Service": "s3.amazonaws.com" }, "Action": [ "SQS:SendMessage" ], "Resource": "SQS-queue-ARN", "Condition": { "ArnLike": { "aws:SourceArn": "arn:aws:s3:*:*:awsexamplebucket1" }, "StringEquals": { "aws:SourceAccount": "bucket-owner-account-id" } } } ] } } ``` aws s3api put-bucket-notification-configuration --bucket customer-data-pdx --notification-configuration file://s3-event-config.json s3-event-config.json ``` { "QueueConfigurations": [ { "Id": "s3event-sqs-queue", "QueueArn": "arn:aws:sqs:{region}:{account}:queuename", "Events": [ "s3:ObjectCreated:*", "s3:ObjectRemoved:*" ], "Filter": { "Key": { "FilterRules": [ { "Name": "Prefix", "Value": "/json" } ] } } } ] } ``` Create Crawler:

Menyiapkan crawler untuk notifikasi peristiwa Amazon S3 menggunakan konsol (Target Katalog Data)

Jika Anda memiliki target katalog, siapkan crawler untuk notifikasi peristiwa Amazon S3 menggunakan AWS Glue konsol:

  1. Tetapkan properti crawler Anda. Untuk informasi selengkapnya, lihat Menyetel Opsi Konfigurasi Crawler di AWS Glue konsol.

  2. Di bagian Konfigurasi sumber data, Anda ditanya Apakah data Anda sudah dipetakan ke AWS Glue tabel?

    Pilih Ya untuk memilih tabel yang ada dari Katalog Data Anda sebagai sumber data Anda.

  3. Di bagian Glue tables, pilih Add tables.

  4. Dalam modal Tambahkan tabel, konfigurasikan database dan tabel:

    • Koneksi jaringan (Opsional): Pilih Tambahkan koneksi baru.

    • Database: Pilih database di Katalog Data.

    • Tabel: Pilih satu atau beberapa tabel dari database tersebut di Katalog Data.

    • Perayap berikutnya berjalan: Pilih Crawl berdasarkan peristiwa untuk menggunakan notifikasi peristiwa Amazon S3 untuk crawler Anda.

    • Sertakan SQS ARN: Tentukan parameter penyimpanan data termasuk SQS ARN yang valid. (Misalnya,arn:aws:sqs:region:account:sqs).

    • Sertakan SQS ARN huruf mati (Opsional): Tentukan SQS ARN huruf mati Amazon yang valid. (Misalnya,arn:aws:sqs:region:account:deadLetterQueue).

    • Pilih Konfirmasi.