Utilizzo degli AWS SDK per interagire con Amazon Ingestion OpenSearch - OpenSearch Servizio Amazon

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo degli AWS SDK per interagire con Amazon Ingestion OpenSearch

Questa sezione include un esempio di come utilizzare gli AWS SDK per interagire con Amazon OpenSearch Ingestion. L'esempio di codice dimostra come creare un dominio e una pipeline e quindi inserire dati nella pipeline.

Argomenti

Python

Lo script di esempio seguente utilizza il ruolo della pipeline IAM AWS SDK for Python (Boto3)per creare un ruolo di pipeline IAM, un dominio in cui scrivere dati e una pipeline attraverso cui importare i dati. Quindi inserisce un file di registro di esempio nella pipeline utilizzando la libreria requests HTTP.

Per installare le dipendenze richieste, eseguire i seguenti comandi:

pip install boto3 pip install botocore pip install requests pip install requests-auth-aws-sigv4

All'interno dello script, sostituisci gli ID dell'account nelle politiche di accesso con il tuo Account AWS ID. Facoltativamente, è possibile anche modificare la region.

import boto3 import botocore from botocore.config import Config import requests from requests_auth_aws_sigv4 import AWSSigV4 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default region. my_config = Config( # Optionally lets you specify a Region other than your default. region_name='us-east-1' ) opensearch = boto3.client('opensearch', config=my_config) iam = boto3.client('iam', config=my_config) osis = boto3.client('osis', config=my_config) domainName = 'test-domain' # The name of the domain pipelineName = 'test-pipeline' # The name of the pipeline def createPipelineRole(iam, domainName): """Creates the pipeline role""" response = iam.create_policy( PolicyName='pipeline-policy', PolicyDocument=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Action\":\"es:DescribeDomain\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\"}},{{\"Effect\":\"Allow\",\"Action\":\"es:ESHttp*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\/*\"}}]}}' ) policyarn = response['Policy']['Arn'] response = iam.create_role( RoleName='PipelineRole', AssumeRolePolicyDocument='{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"osis-pipelines.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}' ) rolename=response['Role']['RoleName'] response = iam.attach_role_policy( RoleName=rolename, PolicyArn=policyarn ) print('Creating pipeline role...') time.sleep(10) print('Role created: ' + rolename) def createDomain(opensearch, domainName): """Creates a domain to ingest data into""" response = opensearch.create_domain( DomainName=domainName, EngineVersion='OpenSearch_2.3', ClusterConfig={ 'InstanceType': 't2.small.search', 'InstanceCount': 5, 'DedicatedMasterEnabled': True, 'DedicatedMasterType': 't2.small.search', 'DedicatedMasterCount': 3 }, # Many instance types require EBS storage. EBSOptions={ 'EBSEnabled': True, 'VolumeType': 'gp2', 'VolumeSize': 10 }, AccessPolicies=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Principal\":{{\"AWS\":\"arn:aws:iam::123456789012:role\/PipelineRole\"}},\"Action\":\"es:*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012:domain\/{domainName}\/*\"}}]}}', NodeToNodeEncryptionOptions={ 'Enabled': True } ) return(response) def waitForDomainProcessing(opensearch, domainName): """Waits for the domain to be active""" try: response = opensearch.describe_domain( DomainName=domainName ) # Every 30 seconds, check whether the domain is processing. while 'Endpoint' not in response['DomainStatus']: print('Creating domain...') time.sleep(60) response = opensearch.describe_domain( DomainName=domainName) # Once we exit the loop, the domain is ready for ingestion. endpoint = response['DomainStatus']['Endpoint'] print('Domain endpoint ready to receive data: ' + endpoint) createPipeline(osis, endpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceNotFoundException': print('Domain not found.') else: raise error def createPipeline(osis, endpoint): """Creates a pipeline using the domain and pipeline role""" try: definition = f'version: \"2\"\nlog-pipeline:\n source:\n http:\n path: \"/${{pipelineName}}/logs\"\n processor:\n - date:\n from_time_received: true\n destination: \"@timestamp\"\n sink:\n - opensearch:\n hosts: [ \"https://{endpoint}\" ]\n index: \"application_logs\"\n aws:\n sts_role_arn: \"arn:aws:iam::123456789012:role/PipelineRole\"\n region: \"us-east-1\"' response = osis.create_pipeline( PipelineName=pipelineName, MinUnits=4, MaxUnits=9, PipelineConfigurationBody=definition ) response = osis.get_pipeline( PipelineName=pipelineName ) # Every 30 seconds, check whether the pipeline is active. while response['Pipeline']['Status'] == 'CREATING': print('Creating pipeline...') time.sleep(30) response = osis.get_pipeline( PipelineName=pipelineName) # Once we exit the loop, the pipeline is ready for ingestion. ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] print('Pipeline ready to ingest data at endpoint: ' + ingestionEndpoint) ingestData(ingestionEndpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceAlreadyExistsException': print('Pipeline already exists.') response = osis.get_pipeline( PipelineName=pipelineName ) ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] ingestData(ingestionEndpoint) else: raise error def ingestData(ingestionEndpoint): """Ingests a sample log file into the pipeline""" endpoint = 'https://' + ingestionEndpoint r = requests.request('POST', f'{endpoint}/log-pipeline/logs', data='[{"time":"2014-08-11T11:40:13+00:00","remote_addr":"122.226.223.69","status":"404","request":"GET http://www.k2proxy.com//hello.html HTTP/1.1","http_user_agent":"Mozilla/4.0 (compatible; WOW64; SLCC2;)"}]', auth=AWSSigV4('osis')) print('Ingesting sample log file into pipeline') print('Response: ' + r.text) def main(): createPipelineRole(iam, domainName) createDomain(opensearch, domainName) waitForDomainProcessing(opensearch, domainName) if __name__ == "__main__": main()