Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.
Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
L'esercizio seguente mostra come avviare un'applicazione Flink creata AWS CloudFormation utilizzando una funzione Lambda nello stesso stack.
Prima di iniziare
Prima di iniziare questo esercizio, seguite i passaggi per creare un'applicazione Flink utilizzando at. AWS CloudFormation AWS::KinesisAnalytics::Application
Scrivere una funzione Lambda
Per avviare un'applicazione Flink dopo averla creata o aggiornata, utilizziamo l'API kinesisanalyticsv2 start-application. La chiamata verrà attivata da un AWS CloudFormation evento dopo la creazione dell'applicazione Flink. Discuteremo come configurare lo stack per attivare la funzione Lambda più avanti in questo esercizio, ma prima concentriamoci sulla dichiarazione della funzione Lambda e sul relativo codice. In questo esempio utilizziamo il runtime di Python3.8
.
StartApplicationLambda:
Type: AWS::Lambda::Function
DependsOn: StartApplicationLambdaRole
Properties:
Description: Starts an application when invoked.
Runtime: python3.8
Role: !GetAtt StartApplicationLambdaRole.Arn
Handler: index.lambda_handler
Timeout: 30
Code:
ZipFile: |
import logging
import cfnresponse
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info('Incoming CFN event {}'.format(event))
try:
application_name = event['ResourceProperties']['ApplicationName']
# filter out events other than Create or Update,
# you can also omit Update in order to start an application on Create only.
if event['RequestType'] not in ["Create", "Update"]:
logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType']))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# use kinesisanalyticsv2 API to start an application.
client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
# get application status.
describe_response = client_kda.describe_application(ApplicationName=application_name)
application_status = describe_response['ApplicationDetail']['ApplicationStatus']
# an application can be started from 'READY' status only.
if application_status != 'READY':
logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# create RunConfiguration.
run_configuration = {
'ApplicationRestoreConfiguration': {
'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
}
}
logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration))
# this call doesn't wait for an application to transfer to 'RUNNING' state.
client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
logger.info('Started Application: {}'.format(application_name))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as err:
logger.error(err)
cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
Nel codice precedente, Lambda elabora gli eventi in AWS CloudFormation arrivo, filtra tutto Create
e, ottiene lo stato dell'applicazione Update
e, se lo stato lo è, la avvia. READY
Per ottenere lo stato dell'applicazione, è necessario creare il ruolo Lambda, come illustrato di seguito.
Creare un ruolo Lambda
Crei un ruolo per Lambda per "parlare" con successo con l'applicazione e scrivere i log. Questo ruolo utilizza politiche gestite predefinite, ma potresti voler restringere il campo all'utilizzo di politiche personalizzate.
StartApplicationLambdaRole:
Type: AWS::IAM::Role
DependsOn: TestFlinkApplication
Properties:
Description: A role for lambda to use while interacting with an application.
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
Path: /
Tieni presente che le risorse Lambda verranno create dopo la creazione dell'applicazione Flink nello stesso stack, poiché dipendono da essa.
Richiama la funzione Lambda
Ora non resta che richiamare la funzione Lambda. A tale scopo, è possibile utilizzare una risorsa personalizzata.
StartApplicationLambdaInvoke:
Description: Invokes StartApplicationLambda to start an application.
Type: AWS::CloudFormation::CustomResource
DependsOn: StartApplicationLambda
Version: "1.0"
Properties:
ServiceToken: !GetAtt StartApplicationLambda.Arn
Region: !Ref AWS::Region
ApplicationName: !Ref TestFlinkApplication
Questo è tutto ciò che serve per avviare l'applicazione Flink usando Lambda. Ora sei pronto per creare il tuo stack o utilizzare l'esempio completo riportato di seguito per vedere come funzionano nella pratica tutti questi passaggi.
Rivedi un esempio esteso
L'esempio seguente è una versione leggermente estesa dei passaggi precedenti con un'ulteriore RunConfiguration
regolazione effettuata tramite i parametri del modello. Questo è uno stack funzionante di prova. Assicurati di leggere le note di accompagnamento:
stack.yaml
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
ApplicationRestoreType:
Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
Type: String
Default: SKIP_RESTORE_FROM_SNAPSHOT
AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
SnapshotName:
Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
Type: String
Default: ''
AllowNonRestoredState:
Description: FlinkRunConfiguration option, can be true or false.
Default: true
Type: String
AllowedValues: [ true, false ]
CodeContentBucketArn:
Description: ARN of a bucket with application code.
Type: String
CodeContentFileKey:
Description: A jar filename with an application code inside a bucket.
Type: String
Conditions:
IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
TestServiceExecutionRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- kinesisanlaytics.amazonaws.com
Action: sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonKinesisFullAccess
- arn:aws:iam::aws:policy/AmazonS3FullAccess
Path: /
InputKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
OutputKinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: 1
TestFlinkApplication:
Type: 'AWS::kinesisanalyticsv2::Application'
Properties:
ApplicationName: 'CFNTestFlinkApplication'
ApplicationDescription: 'Test Flink Application'
RuntimeEnvironment: 'FLINK-1_18'
ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
ApplicationConfiguration:
EnvironmentProperties:
PropertyGroups:
- PropertyGroupId: 'KinesisStreams'
PropertyMap:
INPUT_STREAM_NAME: !Ref InputKinesisStream
OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
AWS_REGION: !Ref AWS::Region
FlinkApplicationConfiguration:
CheckpointConfiguration:
ConfigurationType: 'CUSTOM'
CheckpointingEnabled: True
CheckpointInterval: 1500
MinPauseBetweenCheckpoints: 500
MonitoringConfiguration:
ConfigurationType: 'CUSTOM'
MetricsLevel: 'APPLICATION'
LogLevel: 'INFO'
ParallelismConfiguration:
ConfigurationType: 'CUSTOM'
Parallelism: 1
ParallelismPerKPU: 1
AutoScalingEnabled: True
ApplicationSnapshotConfiguration:
SnapshotsEnabled: True
ApplicationCodeConfiguration:
CodeContent:
S3ContentLocation:
BucketARN: !Ref CodeContentBucketArn
FileKey: !Ref CodeContentFileKey
CodeContentType: 'ZIPFILE'
StartApplicationLambdaRole:
Type: AWS::IAM::Role
DependsOn: TestFlinkApplication
Properties:
Description: A role for lambda to use while interacting with an application.
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- sts:AssumeRole
ManagedPolicyArns:
- arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
- arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
Path: /
StartApplicationLambda:
Type: AWS::Lambda::Function
DependsOn: StartApplicationLambdaRole
Properties:
Description: Starts an application when invoked.
Runtime: python3.8
Role: !GetAtt StartApplicationLambdaRole.Arn
Handler: index.lambda_handler
Timeout: 30
Code:
ZipFile: |
import logging
import cfnresponse
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
logger.info('Incoming CFN event {}'.format(event))
try:
application_name = event['ResourceProperties']['ApplicationName']
# filter out events other than Create or Update,
# you can also omit Update in order to start an application on Create only.
if event['RequestType'] not in ["Create", "Update"]:
logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType']))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# use kinesisanalyticsv2 API to start an application.
client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
# get application status.
describe_response = client_kda.describe_application(ApplicationName=application_name)
application_status = describe_response['ApplicationDetail']['ApplicationStatus']
# an application can be started from 'READY' status only.
if application_status != 'READY':
logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
return
# create RunConfiguration from passed parameters.
run_configuration = {
'FlinkRunConfiguration': {
'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
},
'ApplicationRestoreConfiguration': {
'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
}
}
# add SnapshotName to RunConfiguration if specified.
if event['ResourceProperties']['SnapshotName'] != '':
run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration))
# this call doesn't wait for an application to transfer to 'RUNNING' state.
client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
logger.info('Started Application: {}'.format(application_name))
cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
except Exception as err:
logger.error(err)
cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
StartApplicationLambdaInvoke:
Description: Invokes StartApplicationLambda to start an application.
Type: AWS::CloudFormation::CustomResource
DependsOn: StartApplicationLambda
Version: "1.0"
Properties:
ServiceToken: !GetAtt StartApplicationLambda.Arn
Region: !Ref AWS::Region
ApplicationName: !Ref TestFlinkApplication
ApplicationRestoreType: !Ref ApplicationRestoreType
SnapshotName: !Ref SnapshotName
AllowNonRestoredState: !Ref AllowNonRestoredState
Potresti voler modificare i ruoli per Lambda e per l'applicazione stessa.
Prima di creare lo stack di cui sopra, non dimenticare di specificare i parametri.
parameters.json
[
{
"ParameterKey": "CodeContentBucketArn",
"ParameterValue": "YOUR_BUCKET_ARN"
},
{
"ParameterKey": "CodeContentFileKey",
"ParameterValue": "YOUR_JAR"
},
{
"ParameterKey": "ApplicationRestoreType",
"ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
},
{
"ParameterKey": "AllowNonRestoredState",
"ParameterValue": "true"
}
]
Sostituisci YOUR_BUCKET_ARN
e YOUR_JAR
con i tuoi requisiti specifici. Puoi seguire questa guida per creare un bucket Amazon S3 e un jar di applicazioni.
Ora crea lo stack (sostituisci YOUR_REGION con una regione a tua scelta, ad esempio us-east-1):
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
Ora puoi accedere a https://console.aws.amazon.com/cloudformationStarting
. Potrebbero essere necessari alcuni minuti prima dell'Running
.
Per ulteriori informazioni, consulta gli argomenti seguenti: