Seleziona le tue preferenze relative ai cookie

Utilizziamo cookie essenziali e strumenti simili necessari per fornire il nostro sito e i nostri servizi. Utilizziamo i cookie prestazionali per raccogliere statistiche anonime in modo da poter capire come i clienti utilizzano il nostro sito e apportare miglioramenti. I cookie essenziali non possono essere disattivati, ma puoi fare clic su \"Personalizza\" o \"Rifiuta\" per rifiutare i cookie prestazionali.

Se sei d'accordo, AWS e le terze parti approvate utilizzeranno i cookie anche per fornire utili funzionalità del sito, ricordare le tue preferenze e visualizzare contenuti pertinenti, inclusa la pubblicità pertinente. Per continuare senza accettare questi cookie, fai clic su \"Continua\" o \"Rifiuta\". Per effettuare scelte più dettagliate o saperne di più, fai clic su \"Personalizza\".

Utilizzo CloudFormation con Managed Service per Apache Flink

Modalità Focus
Utilizzo CloudFormation con Managed Service per Apache Flink - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

L'esercizio seguente mostra come avviare un'applicazione Flink creata AWS CloudFormation utilizzando una funzione Lambda nello stesso stack.

Prima di iniziare

Prima di iniziare questo esercizio, seguite i passaggi per creare un'applicazione Flink utilizzando at. AWS CloudFormation AWS::KinesisAnalytics::Application

Scrivere una funzione Lambda

Per avviare un'applicazione Flink dopo averla creata o aggiornata, utilizziamo l'API kinesisanalyticsv2 start-application. La chiamata verrà attivata da un AWS CloudFormation evento dopo la creazione dell'applicazione Flink. Discuteremo come configurare lo stack per attivare la funzione Lambda più avanti in questo esercizio, ma prima concentriamoci sulla dichiarazione della funzione Lambda e sul relativo codice. In questo esempio utilizziamo il runtime di Python3.8.

StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration. run_configuration = { 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT', } } logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})

Nel codice precedente, Lambda elabora gli eventi in AWS CloudFormation arrivo, filtra tutto Create e, ottiene lo stato dell'applicazione Update e, se lo stato lo è, la avvia. READY Per ottenere lo stato dell'applicazione, è necessario creare il ruolo Lambda, come illustrato di seguito.

Creare un ruolo Lambda

Crei un ruolo per Lambda per "parlare" con successo con l'applicazione e scrivere i log. Questo ruolo utilizza politiche gestite predefinite, ma potresti voler restringere il campo all'utilizzo di politiche personalizzate.

StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: /

Tieni presente che le risorse Lambda verranno create dopo la creazione dell'applicazione Flink nello stesso stack, poiché dipendono da essa.

Richiama la funzione Lambda

Ora non resta che richiamare la funzione Lambda. A tale scopo, è possibile utilizzare una risorsa personalizzata.

StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication

Questo è tutto ciò che serve per avviare l'applicazione Flink usando Lambda. Ora sei pronto per creare il tuo stack o utilizzare l'esempio completo riportato di seguito per vedere come funzionano nella pratica tutti questi passaggi.

L'esempio seguente è una versione leggermente estesa dei passaggi precedenti con un'ulteriore RunConfiguration regolazione effettuata tramite i parametri del modello. Questo è uno stack funzionante di prova. Assicurati di leggere le note di accompagnamento:

stack.yaml

Description: 'kinesisanalyticsv2 CloudFormation Test Application' Parameters: ApplicationRestoreType: Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT. Type: String Default: SKIP_RESTORE_FROM_SNAPSHOT AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ] SnapshotName: Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType. Type: String Default: '' AllowNonRestoredState: Description: FlinkRunConfiguration option, can be true or false. Default: true Type: String AllowedValues: [ true, false ] CodeContentBucketArn: Description: ARN of a bucket with application code. Type: String CodeContentFileKey: Description: A jar filename with an application code inside a bucket. Type: String Conditions: IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ] Resources: TestServiceExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - kinesisanlaytics.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonKinesisFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess Path: / InputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 OutputKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 TestFlinkApplication: Type: 'AWS::kinesisanalyticsv2::Application' Properties: ApplicationName: 'CFNTestFlinkApplication' ApplicationDescription: 'Test Flink Application' RuntimeEnvironment: 'FLINK-1_18' ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn ApplicationConfiguration: EnvironmentProperties: PropertyGroups: - PropertyGroupId: 'KinesisStreams' PropertyMap: INPUT_STREAM_NAME: !Ref InputKinesisStream OUTPUT_STREAM_NAME: !Ref OutputKinesisStream AWS_REGION: !Ref AWS::Region FlinkApplicationConfiguration: CheckpointConfiguration: ConfigurationType: 'CUSTOM' CheckpointingEnabled: True CheckpointInterval: 1500 MinPauseBetweenCheckpoints: 500 MonitoringConfiguration: ConfigurationType: 'CUSTOM' MetricsLevel: 'APPLICATION' LogLevel: 'INFO' ParallelismConfiguration: ConfigurationType: 'CUSTOM' Parallelism: 1 ParallelismPerKPU: 1 AutoScalingEnabled: True ApplicationSnapshotConfiguration: SnapshotsEnabled: True ApplicationCodeConfiguration: CodeContent: S3ContentLocation: BucketARN: !Ref CodeContentBucketArn FileKey: !Ref CodeContentFileKey CodeContentType: 'ZIPFILE' StartApplicationLambdaRole: Type: AWS::IAM::Role DependsOn: TestFlinkApplication Properties: Description: A role for lambda to use while interacting with an application. AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess Path: / StartApplicationLambda: Type: AWS::Lambda::Function DependsOn: StartApplicationLambdaRole Properties: Description: Starts an application when invoked. Runtime: python3.8 Role: !GetAtt StartApplicationLambdaRole.Arn Handler: index.lambda_handler Timeout: 30 Code: ZipFile: | import logging import cfnresponse import boto3 logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.info('Incoming CFN event {}'.format(event)) try: application_name = event['ResourceProperties']['ApplicationName'] # filter out events other than Create or Update, # you can also omit Update in order to start an application on Create only. if event['RequestType'] not in ["Create", "Update"]: logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # use kinesisanalyticsv2 API to start an application. client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region']) # get application status. describe_response = client_kda.describe_application(ApplicationName=application_name) application_status = describe_response['ApplicationDetail']['ApplicationStatus'] # an application can be started from 'READY' status only. if application_status != 'READY': logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) return # create RunConfiguration from passed parameters. run_configuration = { 'FlinkRunConfiguration': { 'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true' }, 'ApplicationRestoreConfiguration': { 'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'], } } # add SnapshotName to RunConfiguration if specified. if event['ResourceProperties']['SnapshotName'] != '': run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName'] logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) # this call doesn't wait for an application to transfer to 'RUNNING' state. client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration) logger.info('Started Application: {}'.format(application_name)) cfnresponse.send(event, context, cfnresponse.SUCCESS, {}) except Exception as err: logger.error(err) cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)}) StartApplicationLambdaInvoke: Description: Invokes StartApplicationLambda to start an application. Type: AWS::CloudFormation::CustomResource DependsOn: StartApplicationLambda Version: "1.0" Properties: ServiceToken: !GetAtt StartApplicationLambda.Arn Region: !Ref AWS::Region ApplicationName: !Ref TestFlinkApplication ApplicationRestoreType: !Ref ApplicationRestoreType SnapshotName: !Ref SnapshotName AllowNonRestoredState: !Ref AllowNonRestoredState

Potresti voler modificare i ruoli per Lambda e per l'applicazione stessa.

Prima di creare lo stack di cui sopra, non dimenticare di specificare i parametri.

parameters.json

[ { "ParameterKey": "CodeContentBucketArn", "ParameterValue": "YOUR_BUCKET_ARN" }, { "ParameterKey": "CodeContentFileKey", "ParameterValue": "YOUR_JAR" }, { "ParameterKey": "ApplicationRestoreType", "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT" }, { "ParameterKey": "AllowNonRestoredState", "ParameterValue": "true" } ]

Sostituisci YOUR_BUCKET_ARN e YOUR_JAR con i tuoi requisiti specifici. Puoi seguire questa guida per creare un bucket Amazon S3 e un jar di applicazioni.

Ora crea lo stack (sostituisci YOUR_REGION con una regione a tua scelta, ad esempio us-east-1):

aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM

Ora puoi accedere a https://console.aws.amazon.com/cloudformation e visualizzare i progressi. Una volta creata, dovresti vedere la tua applicazione Flink nello stato Starting. Potrebbero essere necessari alcuni minuti prima dell'Running.

Per ulteriori informazioni, consulta gli argomenti seguenti:

PrivacyCondizioni del sitoPreferenze cookie
© 2025, Amazon Web Services, Inc. o società affiliate. Tutti i diritti riservati.