Package software.amazon.awscdk.services.kinesis
Amazon Kinesis Construct Library
Amazon Kinesis provides collection and processing of large streams of data records in real time. Kinesis data streams can be used for rapid and continuous data intake and aggregation.
Table Of Contents
Streams
Amazon Kinesis Data Streams ingests a large amount of data in real time, durably stores the data, and makes the data available for consumption.
Using the CDK, a new Kinesis stream can be created as part of the stack using the construct's constructor. You may specify the streamName
to give
your own identifier to the stream. If not, CloudFormation will generate a name.
Stream.Builder.create(this, "MyFirstStream") .streamName("my-awesome-stream") .build();
You can also specify properties such as shardCount
to indicate how many shards the stream should choose and a retentionPeriod
to specify how long the data in the shards should remain accessible.
Read more at Creating and Managing Streams
Stream.Builder.create(this, "MyFirstStream") .streamName("my-awesome-stream") .shardCount(3) .retentionPeriod(Duration.hours(48)) .build();
Encryption
Stream encryption enables server-side encryption using an AWS KMS key for a specified stream.
Encryption is enabled by default on your stream with the master key owned by Kinesis Data Streams in regions where it is supported.
new Stream(this, "MyEncryptedStream");
You can enable encryption on your stream with a user-managed key by specifying the encryption
property.
A KMS key will be created for you and associated with the stream.
Stream.Builder.create(this, "MyEncryptedStream") .encryption(StreamEncryption.KMS) .build();
You can also supply your own external KMS key to use for stream encryption by specifying the encryptionKey
property.
Key key = new Key(this, "MyKey"); Stream.Builder.create(this, "MyEncryptedStream") .encryption(StreamEncryption.KMS) .encryptionKey(key) .build();
Import
Any Kinesis stream that has been created outside the stack can be imported into your CDK app.
Streams can be imported by their ARN via the Stream.fromStreamArn()
API
IStream importedStream = Stream.fromStreamArn(this, "ImportedStream", "arn:aws:kinesis:us-east-2:123456789012:stream/f3j09j2230j");
Encrypted Streams can also be imported by their attributes via the Stream.fromStreamAttributes()
API
IStream importedStream = Stream.fromStreamAttributes(this, "ImportedEncryptedStream", StreamAttributes.builder() .streamArn("arn:aws:kinesis:us-east-2:123456789012:stream/f3j09j2230j") .encryptionKey(Key.fromKeyArn(this, "key", "arn:aws:kms:us-east-1:123456789012:key/12345678-1234-1234-1234-123456789012")) .build());
Permission Grants
IAM roles, users or groups which need to be able to work with Amazon Kinesis streams at runtime should be granted IAM permissions.
Any object that implements the IGrantable
interface (has an associated principal) can be granted permissions by calling:
grantRead(principal)
- grants the principal read accessgrantWrite(principal)
- grants the principal write permissions to a StreamgrantReadWrite(principal)
- grants principal read and write permissions
Read Permissions
Grant read
access to a stream by calling the grantRead()
API.
If the stream has an encryption key, read permissions will also be granted to the key.
Role lambdaRole = Role.Builder.create(this, "Role") .assumedBy(new ServicePrincipal("lambda.amazonaws.com")) .description("Example role...") .build(); Stream stream = Stream.Builder.create(this, "MyEncryptedStream") .encryption(StreamEncryption.KMS) .build(); // give lambda permissions to read stream stream.grantRead(lambdaRole);
The following read permissions are provided to a service principal by the grantRead()
API:
kinesis:DescribeStreamSummary
kinesis:GetRecords
kinesis:GetShardIterator
kinesis:ListShards
kinesis:SubscribeToShard
Write Permissions
Grant write
permissions to a stream is provided by calling the grantWrite()
API.
If the stream has an encryption key, write permissions will also be granted to the key.
Role lambdaRole = Role.Builder.create(this, "Role") .assumedBy(new ServicePrincipal("lambda.amazonaws.com")) .description("Example role...") .build(); Stream stream = Stream.Builder.create(this, "MyEncryptedStream") .encryption(StreamEncryption.KMS) .build(); // give lambda permissions to write to stream stream.grantWrite(lambdaRole);
The following write permissions are provided to a service principal by the grantWrite()
API:
kinesis:ListShards
kinesis:PutRecord
kinesis:PutRecords
Custom Permissions
You can add any set of permissions to a stream by calling the grant()
API.
User user = new User(this, "MyUser"); Stream stream = new Stream(this, "MyStream"); // give my user permissions to list shards stream.grant(user, "kinesis:ListShards");
Metrics
You can use common metrics from your stream to create alarms and/or dashboards. The stream.metric('MetricName')
method creates a metric with the stream namespace and dimension. You can also use pre-define methods like stream.metricGetRecordsSuccess()
. To find out more about Kinesis metrics check Monitoring the Amazon Kinesis Data Streams Service with Amazon CloudWatch.
Stream stream = new Stream(this, "MyStream"); // Using base metric method passing the metric name stream.metric("GetRecords.Success"); // using pre-defined metric method stream.metricGetRecordsSuccess(); // using pre-defined and overriding the statistic stream.metricGetRecordsSuccess(MetricOptions.builder().statistic("Maximum").build());
Resource Policy
You can create a resource policy for a data stream. For more information, see Controlling access to Amazon Kinesis Data Streams resources using IAM.
A resource policy is automatically created when addToResourcePolicy
is called, if one doesn't already exist.
Using addToResourcePolicy
is the simplest way to add a resource policy:
Stream stream = new Stream(this, "MyStream"); // create a resource policy via addToResourcePolicy method stream.addToResourcePolicy(PolicyStatement.Builder.create() .resources(List.of(stream.getStreamArn())) .actions(List.of("kinesis:GetRecords")) .principals(List.of(new AnyPrincipal())) .build());
You can create a resource manually by using ResourcePolicy
.
Also, you can set a custom policy document to ResourcePolicy
.
If not, a blank policy document will be set.
Stream stream = new Stream(this, "MyStream"); // create a custom policy document PolicyDocument policyDocument = PolicyDocument.Builder.create() .assignSids(true) .statements(List.of( PolicyStatement.Builder.create() .actions(List.of("kinesis:GetRecords")) .resources(List.of(stream.getStreamArn())) .principals(List.of(new AnyPrincipal())) .build())) .build(); // create a resource policy manually // create a resource policy manually ResourcePolicy.Builder.create(this, "ResourcePolicy") .stream(stream) .policyDocument(policyDocument) .build();
-
ClassDescriptionAttaches a resource-based policy to a data stream or registered consumer.A fluent builder for
CfnResourcePolicy
.Properties for defining aCfnResourcePolicy
.A builder forCfnResourcePolicyProps
An implementation forCfnResourcePolicyProps
Creates a Kinesis stream that captures and transports data records that are emitted from data sources.A fluent builder forCfnStream
.Enables or updates server-side encryption using an AWS KMS key for a specified stream.A builder forCfnStream.StreamEncryptionProperty
An implementation forCfnStream.StreamEncryptionProperty
Specifies the capacity mode to which you want to set your data stream.A builder forCfnStream.StreamModeDetailsProperty
An implementation forCfnStream.StreamModeDetailsProperty
Use the AWS CloudFormationAWS::Kinesis::StreamConsumer
resource to register a consumer with a Kinesis data stream.A fluent builder forCfnStreamConsumer
.Properties for defining aCfnStreamConsumer
.A builder forCfnStreamConsumerProps
An implementation forCfnStreamConsumerProps
Properties for defining aCfnStream
.A builder forCfnStreamProps
An implementation forCfnStreamProps
A Kinesis Stream.Internal default implementation forIStream
.A proxy class which represents a concrete javascript instance of this type.The policy for a data stream or registered consumer.A fluent builder forResourcePolicy
.Properties to associate a data stream with a policy.A builder forResourcePolicyProps
An implementation forResourcePolicyProps
A Kinesis stream.A fluent builder forStream
.A reference to a stream.A builder forStreamAttributes
An implementation forStreamAttributes
What kind of server-side encryption to apply to this stream.Specifies the capacity mode to apply to this stream.Properties for a Kinesis Stream.A builder forStreamProps
An implementation forStreamProps