Amazon OpenSearch Service Construct Library

See Migrating to OpenSearch for migration instructions from aws-cdk-lib/aws-elasticsearch to this module, aws-cdk-lib/aws-opensearchservice.

Quick start

Create a development cluster by simply specifying the version:

dev_domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0
)

To perform version upgrades without replacing the entire domain, specify the enableVersionUpgrade property.

dev_domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    enable_version_upgrade=True
)

Create a cluster with GP3 volumes:

gp3_domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_2_5,
    ebs=EbsOptions(
        volume_size=30,
        volume_type=ec2.EbsDeviceVolumeType.GP3,
        throughput=125,
        iops=3000
    )
)

Create a production grade cluster by also specifying things like capacity and az distribution

prod_domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    capacity=CapacityConfig(
        master_nodes=5,
        data_nodes=20
    ),
    ebs=EbsOptions(
        volume_size=20
    ),
    zone_awareness=ZoneAwarenessConfig(
        availability_zone_count=3
    ),
    logging=LoggingOptions(
        slow_search_log_enabled=True,
        app_log_enabled=True,
        slow_index_log_enabled=True
    )
)

This creates an Amazon OpenSearch Service cluster and automatically sets up log groups for logging the domain logs and slow search logs.

A note about SLR

Some cluster configurations (e.g VPC access) require the existence of the AWSServiceRoleForAmazonElasticsearchService Service-Linked Role.

When performing such operations via the AWS Console, this SLR is created automatically when needed. However, this is not the behavior when using CloudFormation. If an SLR is needed, but doesn’t exist, you will encounter a failure message similar to:

Before you can proceed, you must enable a service-linked role to give Amazon OpenSearch Service...

To resolve this, you need to create the SLR. We recommend using the AWS CLI:

aws iam create-service-linked-role --aws-service-name es.amazonaws.com

You can also create it using the CDK, but note that only the first application deploying this will succeed:

slr = iam.CfnServiceLinkedRole(self, "Service Linked Role",
    aws_service_name="es.amazonaws.com"
)

Importing existing domains

Using a known domain endpoint

To import an existing domain into your CDK application, use the Domain.fromDomainEndpoint factory method. This method accepts a domain endpoint of an already existing domain:

domain_endpoint = "https://my-domain-jcjotrt6f7otem4sqcwbch3c4u.us-east-1.es.amazonaws.com"
domain = Domain.from_domain_endpoint(self, "ImportedDomain", domain_endpoint)

Using the output of another CloudFormation stack

To import an existing domain with the help of an exported value from another CloudFormation stack, use the Domain.fromDomainAttributes factory method. This will accept tokens.

domain_arn = Fn.import_value("another-cf-stack-export-domain-arn")
domain_endpoint = Fn.import_value("another-cf-stack-export-domain-endpoint")
domain = Domain.from_domain_attributes(self, "ImportedDomain",
    domain_arn=domain_arn,
    domain_endpoint=domain_endpoint
)

Permissions

IAM

Helper methods also exist for managing access to the domain.

# fn: lambda.Function
# domain: Domain


# Grant write access to the app-search index
domain.grant_index_write("app-search", fn)

# Grant read access to the 'app-search/_search' path
domain.grant_path_read("app-search/_search", fn)

Encryption

The domain can also be created with encryption enabled:

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    ebs=EbsOptions(
        volume_size=100,
        volume_type=ec2.EbsDeviceVolumeType.GENERAL_PURPOSE_SSD
    ),
    node_to_node_encryption=True,
    encryption_at_rest=EncryptionAtRestOptions(
        enabled=True
    )
)

This sets up the domain with node to node encryption and encryption at rest. You can also choose to supply your own KMS key to use for encryption at rest.

VPC Support

Domains can be placed inside a VPC, providing a secure communication between Amazon OpenSearch Service and other services within the VPC without the need for an internet gateway, NAT device, or VPN connection.

vpc = ec2.Vpc(self, "Vpc")
domain_props = DomainProps(
    version=EngineVersion.OPENSEARCH_1_0,
    removal_policy=RemovalPolicy.DESTROY,
    vpc=vpc,
    # must be enabled since our VPC contains multiple private subnets.
    zone_awareness=ZoneAwarenessConfig(
        enabled=True
    ),
    capacity=CapacityConfig(
        # must be an even number since the default az count is 2.
        data_nodes=2
    )
)
Domain(self, "Domain", domain_props)

In addition, you can use the vpcSubnets property to control which specific subnets will be used, and the securityGroups property to control which security groups will be attached to the domain. By default, CDK will select all private subnets in the VPC, and create one dedicated security group.

Metrics

Helper methods exist to access common domain metrics for example:

# domain: Domain

free_storage_space = domain.metric_free_storage_space()
master_sys_memory_utilization = domain.metric("MasterSysMemoryUtilization")

This module is part of the AWS Cloud Development Kit project.

Fine grained access control

The domain can also be created with a master user configured. The password can be supplied or dynamically created if not supplied.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    enforce_https=True,
    node_to_node_encryption=True,
    encryption_at_rest=EncryptionAtRestOptions(
        enabled=True
    ),
    fine_grained_access_control=AdvancedSecurityOptions(
        master_user_name="master-user"
    )
)

master_user_password = domain.master_user_password

SAML authentication

You can enable SAML authentication to use your existing identity provider to offer single sign-on (SSO) for dashboards on Amazon OpenSearch Service domains running OpenSearch or Elasticsearch 6.7 or later. To use SAML authentication, fine-grained access control must be enabled.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    enforce_https=True,
    node_to_node_encryption=True,
    encryption_at_rest=EncryptionAtRestOptions(
        enabled=True
    ),
    fine_grained_access_control=AdvancedSecurityOptions(
        master_user_name="master-user",
        saml_authentication_enabled=True,
        saml_authentication_options=SAMLOptionsProperty(
            idp_entity_id="entity-id",
            idp_metadata_content="metadata-content-with-quotes-escaped"
        )
    )
)

Using unsigned basic auth

For convenience, the domain can be configured to allow unsigned HTTP requests that use basic auth. Unless the domain is configured to be part of a VPC this means anyone can access the domain using the configured master username and password.

To enable unsigned basic auth access the domain is configured with an access policy that allows anonymous requests, HTTPS required, node to node encryption, encryption at rest and fine grained access control.

If the above settings are not set they will be configured as part of enabling unsigned basic auth. If they are set with conflicting values, an error will be thrown.

If no master user is configured a default master user is created with the username admin.

If no password is configured a default master user password is created and stored in the AWS Secrets Manager as secret. The secret has the prefix <domain id>MasterUser.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    use_unsigned_basic_auth=True
)

master_user_password = domain.master_user_password

Custom access policies

If the domain requires custom access control it can be configured either as a constructor property, or later by means of a helper method.

For simple permissions the accessPolicies constructor may be sufficient:

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    access_policies=[
        iam.PolicyStatement(
            actions=["es:*ESHttpPost", "es:ESHttpPut*"],
            effect=iam.Effect.ALLOW,
            principals=[iam.AccountPrincipal("123456789012")],
            resources=["*"]
        )
    ]
)

For more complex use-cases, for example, to set the domain up to receive data from a cross-account Kinesis Firehose the addAccessPolicies helper method allows for policies that include the explicit domain ARN.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0
)
domain.add_access_policies(
    iam.PolicyStatement(
        actions=["es:ESHttpPost", "es:ESHttpPut"],
        effect=iam.Effect.ALLOW,
        principals=[iam.AccountPrincipal("123456789012")],
        resources=[domain.domain_arn, f"{domain.domainArn}/*"]
    ),
    iam.PolicyStatement(
        actions=["es:ESHttpGet"],
        effect=iam.Effect.ALLOW,
        principals=[iam.AccountPrincipal("123456789012")],
        resources=[f"{domain.domainArn}/_all/_settings", f"{domain.domainArn}/_cluster/stats", f"{domain.domainArn}/index-name*/_mapping/type-name", f"{domain.domainArn}/roletest*/_mapping/roletest", f"{domain.domainArn}/_nodes", f"{domain.domainArn}/_nodes/stats", f"{domain.domainArn}/_nodes/*/stats", f"{domain.domainArn}/_stats", f"{domain.domainArn}/index-name*/_stats", f"{domain.domainArn}/roletest*/_stat"
        ]
    ))

Audit logs

Audit logs can be enabled for a domain, but only when fine grained access control is enabled.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    enforce_https=True,
    node_to_node_encryption=True,
    encryption_at_rest=EncryptionAtRestOptions(
        enabled=True
    ),
    fine_grained_access_control=AdvancedSecurityOptions(
        master_user_name="master-user"
    ),
    logging=LoggingOptions(
        audit_log_enabled=True,
        slow_search_log_enabled=True,
        app_log_enabled=True,
        slow_index_log_enabled=True
    )
)

Suppress creating CloudWatch Logs resource policy

When logging is enabled for the domain, the CloudWatch Logs resource policy is created by default. This resource policy is necessary for logging, but since only a maximum of 10 resource policies can be created per region, the maximum number of resource policies may be a problem when enabling logging for several domains. By setting the suppressLogsResourcePolicy option to true, you can suppress the creation of a CloudWatch Logs resource policy.

If you set the suppressLogsResourcePolicy option to true, you must create a resource policy before deployment. Also, to avoid reaching this limit, consider reusing a broader policy that includes multiple log groups.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    enforce_https=True,
    node_to_node_encryption=True,
    encryption_at_rest=EncryptionAtRestOptions(
        enabled=True
    ),
    fine_grained_access_control=AdvancedSecurityOptions(
        master_user_name="master-user"
    ),
    logging=LoggingOptions(
        audit_log_enabled=True,
        slow_search_log_enabled=True,
        app_log_enabled=True,
        slow_index_log_enabled=True
    ),
    suppress_logs_resource_policy=True
)

UltraWarm

UltraWarm nodes can be enabled to provide a cost-effective way to store large amounts of read-only data.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    capacity=CapacityConfig(
        master_nodes=2,
        warm_nodes=2,
        warm_instance_type="ultrawarm1.medium.search"
    )
)

Cold storage

Cold storage can be enabled on the domain. You must enable UltraWarm storage to enable cold storage.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    capacity=CapacityConfig(
        master_nodes=2,
        warm_nodes=2,
        warm_instance_type="ultrawarm1.medium.search"
    ),
    cold_storage_enabled=True
)

Custom endpoint

Custom endpoints can be configured to reach the domain under a custom domain name.

Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    custom_endpoint=CustomEndpointOptions(
        domain_name="search.example.com"
    )
)

It is also possible to specify a custom certificate instead of the auto-generated one.

Additionally, an automatic CNAME-Record is created if a hosted zone is provided for the custom endpoint

Advanced options

Advanced options can used to configure additional options.

Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    advanced_options={
        "rest.action.multi.allow_explicit_index": "false",
        "indices.fielddata.cache.size": "25",
        "indices.query.bool.max_clause_count": "2048"
    }
)

Amazon Cognito authentication for OpenSearch Dashboards

The domain can be configured to use Amazon Cognito authentication for OpenSearch Dashboards.

# cognito_configuration_role: iam.Role


domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_0,
    cognito_dashboards_auth=CognitoOptions(
        role=cognito_configuration_role,
        identity_pool_id="example-identity-pool-id",
        user_pool_id="example-user-pool-id"
    )
)

## Enable support for Multi-AZ with Standby deployment

The domain can be configured to use multi-AZ with standby.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_3,
    ebs=EbsOptions(
        volume_size=10,
        volume_type=ec2.EbsDeviceVolumeType.GENERAL_PURPOSE_SSD_GP3
    ),
    zone_awareness=ZoneAwarenessConfig(
        enabled=True,
        availability_zone_count=3
    ),
    capacity=CapacityConfig(
        multi_az_with_standby_enabled=True,
        master_nodes=3,
        data_nodes=3
    )
)

Define off-peak windows

The domain can be configured to use a daily 10-hour window considered as off-peak hours.

Off-peak windows were introduced on February 16, 2023. All domains created before this date have the off-peak window disabled by default. You must manually enable and configure the off-peak window for these domains. All domains created after this date will have the off-peak window enabled by default. You can’t disable the off-peak window for a domain after it’s enabled.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_3,
    off_peak_window_enabled=True,  # can be omitted if offPeakWindowStart is set
    off_peak_window_start=WindowStartTime(
        hours=20,
        minutes=0
    )
)

Configuring service software updates

The domain can be configured to use service software updates.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_3,
    enable_auto_software_update=True
)

IP address type

You can specify either dual stack or IPv4 as your IP address type.

domain = Domain(self, "Domain",
    version=EngineVersion.OPENSEARCH_1_3,
    ip_address_type=IpAddressType.DUAL_STACK
)