Memuat data streaming ke OpenSearch Layanan Amazon - OpenSearch Layanan Amazon

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memuat data streaming ke OpenSearch Layanan Amazon

Anda dapat menggunakan OpenSearch Ingestion untuk langsung memuat data streaming ke domain OpenSearch Layanan Amazon Anda, tanpa perlu menggunakan solusi pihak ketiga. Untuk mengirim data ke OpenSearch Ingestion, Anda mengonfigurasi produsen data dan layanan secara otomatis mengirimkan data ke domain atau koleksi yang Anda tentukan. Untuk memulai dengan OpenSearch Ingestion, lihat. Tutorial: Menelan data ke dalam koleksi menggunakan Amazon OpenSearch Ingestion

Anda masih dapat menggunakan sumber lain untuk memuat data streaming, seperti Amazon Data Firehose dan Amazon CloudWatch Logs, yang memiliki dukungan bawaan untuk OpenSearch Layanan. Lainnya, seperti Amazon S3, Amazon Kinesis Data Streams, dan Amazon DynamoDB, gunakan fungsi AWS Lambda sebagai event handler. Fungsi Lambda menanggapi data baru dengan memproses dan streaming data itu ke domain Anda.

catatan

Lambda mendukung beberapa bahasa pemrograman populer dan tersedia di sebagian besar. Wilayah AWSUntuk informasi selengkapnya, lihat Memulai Lambda di PanduanAWS Lambda Pengembang dan titik akhirAWS layanan di. Referensi Umum AWS

Memuat data streaming dari OpenSearch Ingestion

Anda dapat menggunakan Amazon OpenSearch Ingestion untuk memuat data ke domain OpenSearch Layanan. Anda mengonfigurasi produsen data Anda untuk mengirim data ke OpenSearch Ingestion, dan secara otomatis mengirimkan data ke koleksi yang Anda tentukan. Anda juga dapat mengonfigurasi OpenSearch Ingestion untuk mengubah data Anda sebelum mengirimkannya. Untuk informasi selengkapnya, lihat OpenSearch Tertelan Amazon.

Memuat data streaming dari Amazon S3

Anda dapat menggunakan Lambda untuk mengirim data ke domain OpenSearch Layanan Anda dari Amazon S3. Data baru yang tiba di bucket S3 memicu notifikasi peristiwa untuk Lambda, yang kemudian menjalankan kode kustom Anda untuk melakukan pengindeksan.

Metode data streaming ini sangat fleksibel. Anda dapat mengindeks metadata objek, atau jika objek plaintext, mengurai dan mengindeks beberapa elemen dari tubuh objek. Bagian ini mencakup beberapa kode sampel Python yang kurang modern yang menggunakan ekspresi reguler untuk mengurai file log dan mengindeks kecocokan.

Prasyarat

Sebelum melanjutkan, Anda harus memiliki sumber daya berikut.

Prasyarat Deskripsi
Bucket Amazon S3 Untuk informasi selengkapnya, lihat Membuat bucket S3 pertama Anda di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon. Bucket harus berada di Wilayah yang sama dengan domain OpenSearch Layanan Anda.
OpenSearch Domain layanan Tujuan untuk data setelah fungsi Lambda Anda memprosesnya. Untuk informasi selengkapnya, lihat Membuat domain OpenSearch Layanan.

Membuat paket deployment Lambda

Paket deployment adalah file ZIP atau JAR yang berisi kode Anda dan dependensinya. Bagian ini mencakup kode sampel Python. Untuk bahasa pemrograman lainnya, lihat Paket penyebaran Lambda di Panduan Pengembang.AWS Lambda

  1. Buatlah sebuah direktori. Dalam sampel ini, kita menggunakan nama s3-to-opensearch.

  2. Buat file dalam direktori bernamasample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Edit variabel untuk region dan host.

  3. Instal pip jika Anda belum melakukannya, lalu instal dependensi ke direktori baru: package

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    Semua lingkungan eksekusi Lambda telah menginstal Boto3, sehingga Anda tidak perlu memasukkannya ke dalam paket deployment.

  4. Paket kode aplikasi dan dependensi:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Buat fungsi Lambda

Setelah Anda membuat paket deployment, Anda dapat membuat fungsi Lambda. Saat Anda membuat fungsi, pilih nama, runtime (misalnya, Python 3.8), dan peran IAM. IAM role mendefinisikan izin untuk fungsi Anda. Untuk petunjuk mendetail, lihat Membuat fungsi Lambda dengan konsol di PanduanAWS Lambda Pengembang.

Contoh ini mengasumsikan Anda menggunakan konsol. Pilih Python 3.9 dan peran yang memiliki izin baca S3 dan izin menulis OpenSearch Layanan, seperti yang ditunjukkan pada gambar berikut:

Konfigurasi sampel untuk fungsi Lambda

Setelah Anda membuat fungsi, Anda harus menambahkan pemicu. Untuk contoh ini, kita ingin kode berjalan setiap kali file log tiba di bucket S3:

  1. Pilih Tambah pemicu dan pilih S3.

  2. Pilih bucket Anda.

  3. Untuk Jenis peristiwa, pilih TEMPATKAN.

  4. Untuk Prefiks, ketik logs/.

  5. Untuk Sufiks, ketik.log.

  6. Akui peringatan pemanggilan rekursif dan pilih Tambah.

Akhirnya, Anda dapat mengunggah paket deployment Anda:

  1. Pilih Unggah dari dan file.zip, lalu ikuti petunjuk untuk mengunggah paket penerapan Anda.

  2. Setelah unggahan selesai, edit pengaturan Runtime dan ubah Handler menjadi. sample.handler Pengaturan ini memberitahu Lambda file (sample.py) dan metode (handler) yang harus dijalankan setelah pemicu aktif.

Pada titik ini, Anda memiliki satu set lengkap sumber daya: bucket untuk file log, fungsi yang berjalan setiap kali file log ditambahkan ke bucket, kode yang melakukan penguraian dan pengindeksan, dan domain OpenSearch Layanan untuk pencarian dan visualisasi.

Menguji fungsi Lambda

Setelah Anda membuat fungsi, Anda dapat mengujinya dengan mengunggah file ke bucket Amazon S3. Buat file bernama sample.log dengan menggunakan baris log sampel berikut:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Unggah file ke folder logs dari bucket S3 Anda. Untuk petunjuk, lihat Mengunggah objek ke bucket Anda di Panduan Pengguna Layanan Penyimpanan Sederhana Amazon.

Kemudian gunakan konsol OpenSearch Layanan atau OpenSearch Dasbor untuk memverifikasi bahwa lambda-s3-index indeks berisi dua dokumen. Anda juga dapat membuat permintaan pencarian standar:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Memuat data streaming dari Amazon Kinesis Data Streams

Anda dapat memuat data streaming dari Kinesis Data OpenSearch Streams ke Layanan. Data baru yang tiba di aliran data memicu notifikasi peristiwa untuk Lambda, yang kemudian menjalankan kode kustom Anda untuk melakukan pengindeksan. Bagian ini mencakup kode sampel Python yang kurang modern.

Prasyarat

Sebelum melanjutkan, Anda harus memiliki sumber daya berikut.

Prasyarat Deskripsi
Amazon Kinesis Data Stream Sumber peristiwa untuk fungsi Lambda Anda. Untuk mempelajari selengkapnya, lihat Kinesis Data Streams.
OpenSearch Layanan Domain Tujuan untuk data setelah fungsi Lambda Anda memprosesnya. Lihat informasi yang lebih lengkap di Membuat domain OpenSearch Layanan
IAM Role

Peran ini harus memiliki izin dasar OpenSearch Layanan, Kinesis, dan Lambda, seperti berikut ini:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

Peran harus memiliki hubungan kepercayaan berikut:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Untuk mempelajari selengkapnya, lihat Membuat peran IAM di Panduan Pengguna IAM.

Buat fungsi Lambda

Ikuti instruksi di Membuat paket deployment Lambda, tapi buat sebuah direktori bernama kinesis-to-opensearch dan gunakan kode berikut untuk sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Edit variabel untuk region dan host.

Instal pip jika Anda belum melakukannya, maka gunakan perintah berikut untuk menginstal dependensi Anda:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Kemudian ikuti instruksi di Buat fungsi Lambda, namun tentukan IAM role dari Prasyarat dan pengaturan berikut untuk pemicu:

  • Pengaliran Kinesis: Pengaliran Kinesis Anda

  • Ukuran Batch: 100

  • Posisi awal: Potong cakrawala

Untuk mempelajari lebih lanjut, lihat Apa itu Amazon Kinesis Data Streams? di Panduan Pengembang Amazon Kinesis Data Streams.

Pada titik ini, Anda memiliki satu set lengkap sumber daya: aliran data Kinesis, fungsi yang berjalan setelah aliran menerima data baru dan mengindeks data tersebut, dan domain OpenSearch Layanan untuk pencarian dan visualisasi.

Uji Fungsi Lambda

Setelah Anda membuat fungsi, Anda dapat mengujinya dengan menambahkan catatan baru ke aliran data dengan menggunakan AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Kemudian gunakan konsol OpenSearch Layanan atau OpenSearch Dasbor untuk memverifikasi bahwa lambda-kine-index berisi dokumen. Anda juga dapat menggunakan permintaan berikut:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Memuat data streaming dari Amazon DynamoDB

Anda dapat menggunakan AWS Lambda untuk mengirim data ke domain OpenSearch Layanan Anda dari Amazon DynamoDB. Data baru yang tiba di basis data memicu notifikasi peristiwa untuk Lambda, yang kemudian menjalankan kode kustom Anda untuk melakukan pengindeksan.

Prasyarat

Sebelum melanjutkan, Anda harus memiliki sumber daya berikut.

Prasyarat Deskripsi
Tabel DynamoDB

Tabel berisi data sumber Anda. Untuk informasi selengkapnya, lihat Operasi Dasar pada Tabel DynamoDB di Panduan Pengembang Amazon DynamoDB.

Tabel harus berada di Wilayah yang sama dengan domain OpenSearch Layanan Anda dan memiliki aliran yang disetel ke Gambar baru. Untuk mempelajari selengkapnya, lihat Mengaktifkan Pengaliran.

OpenSearch Domain layanan Tujuan untuk data setelah fungsi Lambda Anda memprosesnya. Untuk informasi selengkapnya, lihat Membuat domain OpenSearch Layanan.
IAM Role

Peran ini harus memiliki izin eksekusi OpenSearch Service, DynamoDB, dan Lambda dasar, seperti berikut ini:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

Peran harus memiliki hubungan kepercayaan berikut:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Untuk mempelajari selengkapnya, lihat Membuat peran IAM di Panduan Pengguna IAM.

Buat fungsi Lambda

Ikuti instruksi di Membuat paket deployment Lambda, tapi buat sebuah direktori bernama ddb-to-opensearch dan gunakan kode berikut untuk sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edit variabel untuk region dan host.

Instal pip jika Anda belum melakukannya, maka gunakan perintah berikut untuk menginstal dependensi Anda:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Kemudian ikuti instruksi di Buat fungsi Lambda, namun tentukan IAM role dari Prasyarat dan pengaturan berikut untuk pemicu:

  • Tabel: Tabel DynamoDB Anda

  • Ukuran Batch: 100

  • Posisi awal: Potong cakrawala

Untuk mempelajari lebih lanjut, lihat Memproses Item Baru dengan DynamoDB Streams dan Lambda di Panduan Pengembang Amazon DynamoDB.

Pada titik ini, Anda memiliki satu set lengkap sumber daya: tabel DynamoDB untuk data sumber Anda, aliran perubahan DynamoDB pada tabel, fungsi yang berjalan setelah data sumber Anda berubah dan mengindeks perubahan tersebut, dan domain Layanan untuk pencarian dan visualisasi. OpenSearch

Tes fungsi Lambda

Setelah Anda membuat fungsi, Anda dapat mengujinya dengan menambahkan item baru ke tabel DynamoDB dengan menggunakan AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Kemudian gunakan konsol OpenSearch Layanan atau OpenSearch Dasbor untuk memverifikasi bahwa lambda-index berisi dokumen. Anda juga dapat menggunakan permintaan berikut:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Memuat data streaming dari Amazon Data Firehose

Firehose mendukung OpenSearch Layanan sebagai tujuan pengiriman. Untuk petunjuk tentang cara memuat data streaming ke OpenSearch Layanan, lihat Membuat Aliran Pengiriman Firehose Data Kinesis OpenSearch dan Memilih Layanan untuk Tujuan Anda di Panduan Pengembang Amazon Data Firehose.

Sebelum memuat data ke OpenSearch Layanan, Anda mungkin perlu melakukan transformasi pada data. Untuk mempelajari selengkapnya tentang penggunaan fungsi Lambda untuk menjalankan tugas ini, lihat Transformasi Data Amazon Kinesis Data Firehose dalam panduan yang sama.

Saat Anda mengonfigurasi aliran pengiriman, Firehose menampilkan peran IAM “sekali klik” yang memberikan akses sumber daya yang diperlukan untuk mengirim data ke OpenSearch Layanan, mencadangkan data di Amazon S3, dan mengubah data menggunakan Lambda. Karena kompleksitas yang terlibat dalam menciptakan peran semacam itu secara manual, sebaiknya gunakan peran yang disediakan.

Memuat data streaming dari Amazon CloudWatch

Anda dapat memuat data streaming dari CloudWatch Log ke domain OpenSearch Layanan Anda dengan menggunakan langganan CloudWatch Log. Untuk informasi tentang CloudWatch langganan Amazon, lihat Pemrosesan data log secara real-time dengan langganan. Untuk informasi konfigurasi, lihat data Streaming CloudWatch Log ke OpenSearch Layanan Amazon di Panduan CloudWatch Pengembang Amazon.

Memuat data streaming dari AWS IoT

Anda dapat mengirim data dari AWS IoT menggunakan aturan. Untuk mempelajari lebih lanjut, lihat OpenSearchtindakan di PanduanAWS IoT Pengembang.