Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwendung der AWS SDKs zur Interaktion mit Amazon Ingestion OpenSearch
Dieser Abschnitt enthält ein Beispiel für die Verwendung der AWS SDKs zur Interaktion mit Amazon OpenSearch Ingestion. Das Codebeispiel zeigt, wie Sie eine Domain und eine Pipeline erstellen und dann Daten in die Pipeline aufnehmen.
Themen
Python
Das folgende Beispielskript verwendet die, AWS SDK for Python (Boto3)requests
HTTP-Bibliothek eine Beispielprotokolldatei in die Pipeline aufgenommen.
Führen Sie die folgenden Befehle aus, um die erforderlichen Abhängigkeiten zu installieren:
pip install boto3 pip install botocore pip install requests pip install requests-auth-aws-sigv4
Ersetzen Sie innerhalb des Skripts die Konto-IDs in den Zugriffsrichtlinien durch Ihre AWS-Konto ID. Optional können Sie auch das region
ändern.
import boto3 import botocore from botocore.config import Config import requests from requests_auth_aws_sigv4 import AWSSigV4 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default region. my_config = Config( # Optionally lets you specify a Region other than your default. region_name='us-east-1' ) opensearch = boto3.client('opensearch', config=my_config) iam = boto3.client('iam', config=my_config) osis = boto3.client('osis', config=my_config) domainName = 'test-domain' # The name of the domain pipelineName = 'test-pipeline' # The name of the pipeline def createPipelineRole(iam, domainName): """Creates the pipeline role""" response = iam.create_policy( PolicyName='pipeline-policy', PolicyDocument=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Action\":\"es:DescribeDomain\",\"Resource\":\"arn:aws:es:us-east-1:
123456789012
:domain\/{domainName}\"}},{{\"Effect\":\"Allow\",\"Action\":\"es:ESHttp*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012
:domain\/{domainName}\/*\"}}]}}' ) policyarn = response['Policy']['Arn'] response = iam.create_role( RoleName='PipelineRole', AssumeRolePolicyDocument='{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"osis-pipelines.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}' ) rolename=response['Role']['RoleName'] response = iam.attach_role_policy( RoleName=rolename, PolicyArn=policyarn ) print('Creating pipeline role...') time.sleep(10) print('Role created: ' + rolename) def createDomain(opensearch, domainName): """Creates a domain to ingest data into""" response = opensearch.create_domain( DomainName=domainName, EngineVersion='OpenSearch_2.3', ClusterConfig={ 'InstanceType': 't2.small.search', 'InstanceCount': 5, 'DedicatedMasterEnabled': True, 'DedicatedMasterType': 't2.small.search', 'DedicatedMasterCount': 3 }, # Many instance types require EBS storage. EBSOptions={ 'EBSEnabled': True, 'VolumeType': 'gp2', 'VolumeSize': 10 }, AccessPolicies=f'{{\"Version\":\"2012-10-17\",\"Statement\":[{{\"Effect\":\"Allow\",\"Principal\":{{\"AWS\":\"arn:aws:iam::123456789012
:role\/PipelineRole\"}},\"Action\":\"es:*\",\"Resource\":\"arn:aws:es:us-east-1:123456789012
:domain\/{domainName}\/*\"}}]}}', NodeToNodeEncryptionOptions={ 'Enabled': True } ) return(response) def waitForDomainProcessing(opensearch, domainName): """Waits for the domain to be active""" try: response = opensearch.describe_domain( DomainName=domainName ) # Every 30 seconds, check whether the domain is processing. while 'Endpoint' not in response['DomainStatus']: print('Creating domain...') time.sleep(60) response = opensearch.describe_domain( DomainName=domainName) # Once we exit the loop, the domain is ready for ingestion. endpoint = response['DomainStatus']['Endpoint'] print('Domain endpoint ready to receive data: ' + endpoint) createPipeline(osis, endpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceNotFoundException': print('Domain not found.') else: raise error def createPipeline(osis, endpoint): """Creates a pipeline using the domain and pipeline role""" try: definition = f'version: \"2\"\nlog-pipeline:\n source:\n http:\n path: \"/${{pipelineName}}/logs\"\n processor:\n - date:\n from_time_received: true\n destination: \"@timestamp\"\n sink:\n - opensearch:\n hosts: [ \"https://{endpoint}\" ]\n index: \"application_logs\"\n aws:\n sts_role_arn: \"arn:aws:iam::123456789012
:role/PipelineRole\"\n region: \"us-east-1\"' response = osis.create_pipeline( PipelineName=pipelineName, MinUnits=4, MaxUnits=9, PipelineConfigurationBody=definition ) response = osis.get_pipeline( PipelineName=pipelineName ) # Every 30 seconds, check whether the pipeline is active. while response['Pipeline']['Status'] == 'CREATING': print('Creating pipeline...') time.sleep(30) response = osis.get_pipeline( PipelineName=pipelineName) # Once we exit the loop, the pipeline is ready for ingestion. ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] print('Pipeline ready to ingest data at endpoint: ' + ingestionEndpoint) ingestData(ingestionEndpoint) except botocore.exceptions.ClientError as error: if error.response['Error']['Code'] == 'ResourceAlreadyExistsException': print('Pipeline already exists.') response = osis.get_pipeline( PipelineName=pipelineName ) ingestionEndpoint = response['Pipeline']['IngestEndpointUrls'][0] ingestData(ingestionEndpoint) else: raise error def ingestData(ingestionEndpoint): """Ingests a sample log file into the pipeline""" endpoint = 'https://' + ingestionEndpoint r = requests.request('POST', f'{endpoint}/log-pipeline/logs', data='[{"time":"2014-08-11T11:40:13+00:00","remote_addr":"122.226.223.69","status":"404","request":"GET http://www.k2proxy.com//hello.html HTTP/1.1","http_user_agent":"Mozilla/4.0 (compatible; WOW64; SLCC2;)"}]', auth=AWSSigV4('osis')) print('Ingesting sample log file into pipeline') print('Response: ' + r.text) def main(): createPipelineRole(iam, domainName) createDomain(opensearch, domainName) waitForDomainProcessing(opensearch, domainName) if __name__ == "__main__": main()