Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Managed Service for Apache Flink CloudFormation で を使用する
次の演習では、同じスタックで Lambda 関数 AWS CloudFormation を使用して で作成された Flink アプリケーションを開始する方法を示します。
開始する前に
この演習を開始する前に、:AWS::KinesisAnalytics:Application AWS CloudFormation で を使用して Flink アプリケーションを作成するステップに従います。
Lambda 関数の書き込み
作成または更新後に Flink アプリケーションを起動するには、kinesisanalyticsv2 start-application を使用しますAPI。呼び出しは、Flink アプリケーションの作成後に AWS CloudFormation イベントによってトリガーされます。Lambda 関数をトリガーするためのスタックの設定方法については、このエクササイズの後半で説明しますが、まずは Lambda 関数の宣言とそのコードに焦点を当てます。この例では「Python3.8
」ランタイムを使用しています。
StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration. run_configuration = { 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT', } } logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
上記のコードでは、Lambda は受信 AWS CloudFormation イベントを処理し、 Create
と 以外のすべてをフィルタリングしUpdate
、アプリケーションの状態を取得し、状態が の場合は起動しますREADY
。アプリケーションの状態を取得するには、次のように Lambda ロールを作成する必要があります。
Lambda ロールを作成する
Lambda がアプリケーションと正常に「対話」してログを書き込むためのロールを作成します。このロールはデフォルトのマネージドポリシーを使用しますが、カスタムポリシーを使用するように絞り込むこともできます。
StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: /
Lambda リソースは Flink アプリケーションに依存しているため、同じスタックで Flink アプリケーションを作成した後に作成されることに注意してください。
Lambda 関数の呼び出し
あとは、Lambda 関数を呼び出すだけです。これは、カスタムリソース を使用して行います。
StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication
Lambdaを使って Flink アプリケーションを起動するのに必要なものはこれだけです。これで、独自のスタックを作成するか、以下の完全な例を使用して、これらすべてのステップが実際にどのように機能するかを確認する準備ができました。
拡張例を確認する
次の例は、テンプレートパラメータ を使用して追加のRunConfiguration
調整を行う、前のステップの若干拡張されたバージョンです。これは、試してみるための作業スタックです。添付の注意事項を必ずお読みください。
stack.yaml
Description: 'kinesisanalyticsv2 CloudFormation Test Application' Parameters: ApplicationRestoreType: Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT. Type: String Default: SKIP_RESTORE_FROM_SNAPSHOT AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ] SnapshotName: Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType. Type: String Default: '' AllowNonRestoredState: Description: FlinkRunConfiguration option, can be true or false. Default: true Type: String AllowedValues: [ true, false ] CodeContentBucketArn: Description: ARN of a bucket with application code. Type: String CodeContentFileKey: Description: A jar filename with an application code inside a bucket. Type: String Conditions: IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ] Resources: TestServiceExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanlaytics.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess Path: / InputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 OutputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 TestFlinkApplication: Type: 'AWS::kinesisanalyticsv2::Application' Properties: ApplicationName: 'CFNTestFlinkApplication' ApplicationDescription: 'Test Flink Application' RuntimeEnvironment: 'FLINK-1_18' ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn ApplicationConfiguration: EnvironmentProperties: PropertyGroups: - PropertyGroupId: 'KinesisStreams' PropertyMap: INPUT_STREAM_NAME: !Ref InputKinesisStream OUTPUT_STREAM_NAME: !Ref OutputKinesisStream AWS_REGION: !Ref AWS::Region FlinkApplicationConfiguration: CheckpointConfiguration: ConfigurationType: 'CUSTOM' CheckpointingEnabled: True CheckpointInterval: 1500 MinPauseBetweenCheckpoints: 500 MonitoringConfiguration: ConfigurationType: 'CUSTOM' MetricsLevel: 'APPLICATION' LogLevel: 'INFO' ParallelismConfiguration: ConfigurationType: 'CUSTOM' Parallelism: 1 ParallelismPerKPU: 1 AutoScalingEnabled: True ApplicationSnapshotConfiguration: SnapshotsEnabled: True ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: !Ref CodeContentBucketArn FileKey: !Ref CodeContentFileKey CodeContentType: 'ZIPFILE' StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: / StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration from passed parameters. run_configuration = { 'FlinkRunConfiguration': { 'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true' }, 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'], } } # add SnapshotName to RunConfiguration if specified. if event['ResourceProperties']['SnapshotName'] != '': run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName'] logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)}) StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication ApplicationRestoreType: !Ref ApplicationRestoreType SnapshotName: !Ref SnapshotName AllowNonRestoredState: !Ref AllowNonRestoredState
繰り返しになりますが、アプリケーション自体だけでなく、Lambda のロールも調整したい場合があります。
上記のスタックを作成する前に、パラメータを指定することを忘れないでください。
parameters.json
[ { "ParameterKey": "CodeContentBucketArn", "ParameterValue": "YOUR_BUCKET_ARN" }, { "ParameterKey": "CodeContentFileKey", "ParameterValue": "YOUR_JAR" }, { "ParameterKey": "ApplicationRestoreType", "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT" }, { "ParameterKey": "AllowNonRestoredState", "ParameterValue": "true" } ]
「YOUR_BUCKET_ARN
」と「YOUR_JAR
」を特定の要件に置き換えてください。この「ガイド」に従って Amazon S3 バケットとアプリケーション jar を作成できます。
次に、スタックを作成します (YOUR_REGION を、us-east-1 などの任意のリージョンに置き換えます)。
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
これで、https://console.aws.amazon.com/cloudformationStarting
」状態になるはずです。「Running
」の起動には数分かかる場合があります。
詳細については、次を参照してください。