Menggunakan AWS SDKs untuk berinteraksi dengan Amazon OpenSearch Ingestion - OpenSearch Layanan Amazon

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan AWS SDKs untuk berinteraksi dengan Amazon OpenSearch Ingestion

Bagian ini mencakup contoh cara menggunakan AWS SDKs untuk berinteraksi dengan Amazon OpenSearch Ingestion. Contoh kode menunjukkan cara membuat domain dan pipeline, dan kemudian menyerap data ke dalam pipeline.

Topik

Python

Contoh skrip berikut menggunakan AWS SDK for Python (Boto3)untuk membuat peran pipeline IAM, domain untuk menulis data, dan pipeline untuk menyerap data. Kemudian menyerap file log sampel ke dalam pipeline menggunakan perpustakaan requests HTTP.

Untuk menginstal dependensi yang diperlukan, jalankan perintah berikut:

pip install boto3 pip install botocore pip install requests pip install requests-auth-aws-sigv4

Dalam skrip, ganti akun IDs dalam kebijakan akses dengan Akun AWS ID Anda. Anda juga dapat secara opsional memodifikasi. region

import boto3 import botocore from botocore.config import Config import requests from requests_auth_aws_sigv4 import AWSSigV4 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default region. my_config = Config( # Optionally lets you specify a Region other than your default. region_name='us-east-1' ) opensearch = boto3.client('opensearch', config=my_config) iam = boto3.client('iam', config=my_config) osis = boto3.client('osis', config=my_config) domainName = 'test-domain' # The name of the domain pipelineName = 'test-pipeline' # The name of the pipeline def createPipelineRole(iam, domainName): """Creates the pipeline role""" response = iam.create_policy( PolicyName='pipeline-policy', PolicyDocument=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Action\":\"es:DescribeDomain\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\"}},{{\"Effect\":\"Allow\",\"Action\":\"es:ESHttp*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\/*\"}}]}}' ) policyarn = response['Policy']['Arn'] response = iam.create_role( RoleName='PipelineRole', AssumeRolePolicyDocument='{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"osis-pipelines.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}' ) rolename=response['Role']['RoleName'] response = iam.attach_role_policy( RoleName=rolename, PolicyArn=policyarn ) print('Creating pipeline role...') time.sleep(10) print('Role created: ' + rolename) def createDomain(opensearch, domainName): """Creates a domain to ingest data into""" response = opensearch.create_domain( DomainName=domainName, EngineVersion='OpenSearch_2.3', ClusterConfig={ 'InstanceType': 't2.small.search', 'InstanceCount': 5, 'DedicatedMasterEnabled': True, 'DedicatedMasterType': 't2.small.search', 'DedicatedMasterCount': 3 }, # Many instance types require EBS storage. EBSOptions={ 'EBSEnabled': True, 'VolumeType': 'gp2', 'VolumeSize': 10 }, AccessPolicies=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Principal\":{{\"AWS\":\"arn:aws:iam::123456789012:role\/PipelineRole\"}},\"Action\":\"es:*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\/*\"}}]}}', NodeToNodeEncryptionOptions={ 'Enabled': True } ) return(response) def waitForDomainProcessing(opensearch, domainName): """Waits for the domain to be active""" try: response = opensearch.describe_domain( DomainName=domainName ) # Every 30 seconds, check whether the domain is processing. while 'Endpoint' not in response['DomainStatus']: print('Creating domain...') time.sleep(60) response = opensearch.describe_domain( DomainName=domainName) # Once we exit the loop, the domain is ready for ingestion. endpoint = response['DomainStatus']['Endpoint'] print('Domain endpoint ready to receive data: ' + endpoint) createPipeline(osis, endpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceNotFoundException': print('Domain not found.') else: raise error def createPipeline(osis, endpoint): """Creates a pipeline using the domain and pipeline role""" try: definition = f'version: \"2\"\nlog-pipeline:\n source:\n http:\n path: \"/${{pipelineName}}/logs\"\n processor:\n - date:\n from_time_received: true\n destination: \"@timestamp\"\n sink:\n - opensearch:\n hosts: [ \"https://{endpoint}\" ]\n index: \"application_logs\"\n aws:\n sts_role_arn: \"arn:aws:iam::123456789012:role/PipelineRole\"\n region: \"us-east-1\"' response = osis.create_pipeline( PipelineName=pipelineName, MinUnits=4, MaxUnits=9, PipelineConfigurationBody=definition ) response = osis.get_pipeline( PipelineName=pipelineName ) # Every 30 seconds, check whether the pipeline is active. while response['Pipeline']['Status'] == 'CREATING': print('Creating pipeline...') time.sleep(30) response = osis.get_pipeline( PipelineName=pipelineName) # Once we exit the loop, the pipeline is ready for ingestion. ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] print('Pipeline ready to ingest data at endpoint: ' + ingestionEndpoint) ingestData(ingestionEndpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceAlreadyExistsException': print('Pipeline already exists.') response = osis.get_pipeline( PipelineName=pipelineName ) ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] ingestData(ingestionEndpoint) else: raise error def ingestData(ingestionEndpoint): """Ingests a sample log file into the pipeline""" endpoint = 'https://' + ingestionEndpoint r = requests.request('POST', f'{endpoint}/log-pipeline/logs', data='[{"time":"2014-08-11T11:40:13+00:00","remote_addr":"122.226.223.69","status":"404","request":"GET http://www.k2proxy.com//hello.html HTTP/1.1","http_user_agent":"Mozilla/4.0 (compatible; WOW64; SLCC2;)"}]', auth=AWSSigV4('osis')) print('Ingesting sample log file into pipeline') print('Response: ' + r.text) def main(): createPipelineRole(iam, domainName) createDomain(opensearch, domainName) waitForDomainProcessing(opensearch, domainName) if __name__ == "__main__": main()