Utilizzo di Lambda con Apache Kafka gestito dal cliente - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di Lambda con Apache Kafka gestito dal cliente

Lambda supporta Apache Kafka come origine eventi. Apache Kafka è una piattaforma di flussi di eventi open source che supporta carichi di lavoro come pipeline di dati e analisi dei dati di streaming.

Puoi utilizzare il servizio Kafka AWS gestito Amazon Managed Streaming for Apache Kafka (Amazon MSK) o un cluster Kafka autogestito. Per informazioni dettagliate sull'uso di Lambda con Amazon MSK, consultare Uso di Lambda con Amazon MSK.

Questo argomento descrive come utilizzare Lambda con un cluster Kafka autogestito. In AWS terminologia, un cluster autogestito include cluster Kafka non ospitati.AWS Ad esempio, è possibile ospitare il proprio cluster Kafka con un provider cloud come Confluent Cloud.

Apache Kafka come origine eventi funziona in modo simile all'utilizzo di Amazon Simple Queue Service (Amazon SQS) o Amazon Kinesis. Lambda interroga internamente i nuovi messaggi dell'origine eventi, quindi richiama in modo sincrono la funzione Lambda di destinazione. Lambda legge i messaggi in batch e li fornisce alla funzione come payload di evento. La dimensione massima del batch è configurabile. (L'impostazione predefinita è 100 messaggi.)

avvertimento

Le mappature delle sorgenti degli eventi Lambda elaborano ogni evento almeno una volta e può verificarsi un'elaborazione duplicata dei record. Per evitare potenziali problemi legati agli eventi duplicati, ti consigliamo vivamente di rendere idempotente il codice della funzione. Per ulteriori informazioni, consulta Come posso rendere idempotente la mia funzione Lambda nel Knowledge Center. AWS

Per le origini eventi basate su KAFKA, Lambda supporta i parametri di controllo dell'elaborazione, come le finestre di batch e le dimensioni del batch. Per ulteriori informazioni, consulta Comportamento di batching.

Per un esempio di come utilizzare Kafka autogestito come fonte di eventi, vedi Utilizzo di Apache Kafka ospitato autonomamente come fonte di eventi per sul blog di Compute. AWS Lambda AWS

Esempio di evento

Lambda invia il batch di messaggi nel parametro evento quando richiama la funzione Lambda. Il payload evento contiene un array di messaggi. Ogni elemento dell'array contiene i dettagli dell'argomento Kafka e dell'identificatore della partizione Kafka, insieme a una data/ora e a un messaggio con codifica base64.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Autenticazione cluster Kafka

Lambda supporta diversi metodi per l'autenticazione al cluster Apache Kafka autogestito. Assicurarsi di configurare il cluster Kafka in modo da utilizzare uno dei seguenti metodi di autenticazione supportati. Per ulteriori informazioni sulla sicurezza con Kafka, consultare la sezione Sicurezza della documentazione di Kafka.

Accesso VPC

Se accedono ai broker Kafka soltanto gli utenti Kafka all'interno del VPC, bisogna configurare la fonte evento Kafka per l'accesso ad Amazon Virtual Private Cloud (Amazon VPC).

Autenticazione SASL/SCRAM

Lambda supporta l'autenticazione Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) con crittografia Transport Layer Security (TLS) (SASL_SSL). Lambda invia le credenziali crittografate per l'autenticazione con il cluster. Lambda non supporta SASL/SCRAM con testo in chiaro (SASL_PLAINTEXT). Per ulteriori informazioni sull'autenticazione SASL/SCRAM, consultare RFC 5802.

Lambda supporta anche l'autenticazione SASL/PLAIN. Poiché questo meccanismo utilizza credenziali in chiaro, la connessione al server deve utilizzare la crittografia TLS per garantire che le credenziali siano protette.

Per l'autenticazione SASL, è necessario archiviare le credenziali di accesso come segreto in AWS Secrets Manager. Per ulteriori informazioni sull'uso di Secrets Manager, consultare Tutorial: Creare e recuperare un segreto nella Guida per l'utente AWS Secrets Manager .

Importante

Per utilizzare Secrets Manager per l'autenticazione, i segreti devono essere archiviati nella stessa AWS area della funzione Lambda.

Autenticazione TLS reciproca

MTLS (Mutual TLS) fornisce l'autenticazione bidirezionale tra client e server. Il client invia un certificato al server affinché il server verifichi il client e il server invia un certificato al client affinché il client verifichi il server.

In Apache Kafka autogestito Lambda agisce come client. È possibile configurare un certificato client (come segreto in Secrets Manager) per autenticare Lambda con i broker Kafka. Il certificato client deve essere firmato da una CA nell'archivio trust del server.

Il cluster Kafka invia un certificato server a Lambda per autenticare i broker con Lambda. Il certificato del server può essere un certificato CA pubblico o un certificato CA/autofirmato privato. Il certificato emesso da una CA pubblica deve essere firmato da un'autorità di certificazione (CA) presente nel trust store di Lambda. Per un certificato CA/autofirmato privato, è possibile configurare il certificato CA root del server (come segreto in Secrets Manager). Lambda utilizza il certificato root per verificare i broker Kafka.

Per ulteriori informazioni su mTLS, consultare Introduzione dell'autenticazione TLS reciproca per Amazon MSK come origine eventi.

Configurazione del segreto del certificato client

Il segreto CLIENT_CERTIFICATE_TLS_AUTH richiede un campo certificato e un campo chiave privata. Per una chiave privata crittografata, il segreto richiede una password per chiave privata. Il certificato e la chiave privata devono essere in formato PEM.

Nota

Lambda supporta il PBES1 (ma non PBES2) come algoritmi di crittografia a chiave privata.

Il campo certificato deve contenere un elenco di certificati, a partire dal certificato client, seguito da qualsiasi certificato intermedio, per finire con il certificato root. Ogni certificato deve iniziare su una nuova riga con la struttura seguente:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager supporta segreti fino a 65.536 byte, che è uno spazio sufficiente per lunghe catene di certificati.

La chiave privata deve essere in formato PKCS #8, con la struttura seguente:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Per una chiave privata crittografata, utilizza la struttura seguente:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

Nell'esempio seguente viene mostrato il contenuto di un segreto per l'autenticazione mTLS utilizzando una chiave privata crittografata. Per una chiave privata crittografata, includere la password per chiave privata nel segreto.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Configurazione del segreto del certificato CA root del server

Questo segreto viene creato se i broker Kafka utilizzano la crittografia TLS con certificati firmati da una CA privata. È possibile utilizzare la crittografia TLS per l'autenticazione VPC, SASL/SCRAM, SASL/PLAIN o mTLS.

Il segreto del certificato CA root del server richiede un campo che contiene il certificato CA root del broker Kafka in formato PEM. Il seguente esempio illustra la struttura del segreto.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

Gestione dell'accesso e delle autorizzazioni API

Oltre ad accedere al cluster Kafka autogestito, la funzione Lambda necessita di autorizzazioni per eseguire varie operazioni API. Aggiungere queste autorizzazioni al ruolo di esecuzione della funzione. Se i tuoi utenti devono accedere a qualsiasi azione API, aggiungi le autorizzazioni richieste alla politica di identità per l'utente o il AWS Identity and Access Management ruolo (IAM).

Autorizzazioni necessarie per la funzione Lambda

Per creare e archiviare i log in un gruppo di log in Amazon CloudWatch Logs, la funzione Lambda deve disporre delle seguenti autorizzazioni nel ruolo di esecuzione:

Autorizzazioni facoltative per la funzione Lambda

La funzione Lambda potrebbe richiedere autorizzazioni per:

  • Descrivere il segreto di Secrets Manager.

  • Accedi alla tua chiave AWS Key Management Service (AWS KMS) gestita dal cliente.

  • Accedere ad Amazon VPC.

  • Invia i record delle chiamate non riuscite a una destinazione.

Secrets Manager e AWS KMS autorizzazioni

A seconda del tipo di controllo degli accessi che stai configurando per i tuoi broker Kafka, la tua funzione Lambda potrebbe richiedere l'autorizzazione per accedere al tuo segreto di Secrets Manager o per decrittografare la tua chiave gestita dal cliente. AWS KMS Per accedere a queste risorse, il ruolo di esecuzione della funzione deve disporre delle seguenti autorizzazioni:

Autorizzazioni VPC

Se soltanto gli utenti all'interno di un VPC possono accedere al cluster Apache Kafka autogestito, la funzione Lambda deve disporre dell'autorizzazione per accedere alle risorse di Amazon VPC. Queste risorse includono la VPC, le sottoreti, i gruppi di sicurezza e le interfacce di rete. Per accedere a queste risorse, il ruolo di esecuzione della funzione deve disporre delle seguenti autorizzazioni:

Aggiunta di autorizzazioni al ruolo di esecuzione

Per accedere ad altri AWS servizi utilizzati dal cluster Apache Kafka autogestito, Lambda utilizza le politiche di autorizzazione definite nel ruolo di esecuzione della funzione Lambda.

Per impostazione predefinita, Lambda non è autorizzato a eseguire le operazioni richieste o facoltative per un cluster Apache Kafka autogestito. Dovrai creare e definire queste operazioni in una policy di attendibilità IAM e quindi collegare la policy al ruolo di esecuzione. Questo esempio mostra come creare una policy che consente a Lambda di accedere alle risorse Amazon VPC.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

Per informazioni sulla creazione di un documento di policy JSON nella console IAM, consultare Creazione di policy nella scheda JSON nella Guida per l'utente di IAM.

Concessione di accesso agli utenti con una policy IAM

Per impostazione predefinita, gli utenti e i ruoli non dispongono dell'autorizzazione per eseguire operazioni API di origine eventi. Per concedere l'accesso agli utenti dell'organizzazione o dell'account, è possibile creare o aggiornare una policy basata sull'identità. Per ulteriori informazioni, consulta Controlling access to AWS resources using policies nella IAM User Guide.

Errori di autenticazione e autorizzazione

Se manca una delle autorizzazioni necessarie per consumare i dati dal cluster Kafka, Lambda visualizza uno dei seguenti messaggi di errore nella mappatura delle sorgenti degli eventi in Risultato. LastProcessing

Il cluster non è riuscito ad autorizzare Lambda

Per SASL/SCRAM o mTLS, questo errore indica che l'utente fornito non dispone di tutte le seguenti autorizzazioni della lista di controllo accessi Kafka (ACL) richieste:

  • DescribeConfigs Cluster

  • Descrivi il gruppo

  • Leggi il gruppo

  • Descrivi l'argomento

  • Leggi l'argomento

Quando si crea ACL Kafka con le autorizzazioni kafka-cluster richieste, è necessario specificare l'argomento e il gruppo come risorse. Il nome dell'argomento deve corrispondere all'argomento nella mappatura dell'origine eventi. Il nome del gruppo deve corrispondere all'UUID della mappatura dell'origine eventi.

Dopo avere aggiunto le autorizzazioni richieste al ruolo di esecuzione, potrebbero essere necessari alcuni minuti affinché le modifiche entrino in vigore.

Autenticazione SASL non riuscita

Per SASL/SCRAM o SASL/PLAIN, questo errore indica che le credenziali di accesso fornite non sono valide.

Il server non è riuscito ad autenticare Lambda

Questo errore indica che il broker Kafka non è riuscito ad autenticare Lambda. Questo errore può verificarsi per uno dei seguenti motivi:

  • Non è stato fornito un certificato client per l'autenticazione mTLS.

  • È stato fornito un certificato client, ma i broker Kafka non sono configurati per l'utilizzo dell'autenticazione mTLS.

  • Un certificato client non è attendibile per i broker Kafka.

Lambda non è riuscita ad autenticare il server

Questo errore indica che Lambda non è riuscita ad autenticare il broker Kafka. Questo errore può verificarsi per uno dei seguenti motivi:

  • I broker Kafka utilizzano certificati autofirmati o una CA privata, ma non hanno fornito il certificato CA root del server.

  • Il certificato CA root del server non corrisponde alla CA root che ha firmato il certificato del broker.

  • La convalida del nome host non è riuscita perché il certificato del broker non contiene il nome DNS o l'indirizzo IP del broker come nome alternativo dell'oggetto.

Il certificato o la chiave privata forniti non sono validi

Questo errore indica che il consumatore Kafka non ha potuto utilizzare il certificato o la chiave privata fornita. Assicurati che il certificato e la chiave utilizzino il formato PEM e che la crittografia della chiave privata utilizzi un algoritmo PBES1.

Configurazione della rete

Affinché Lambda utilizzi il cluster Kafka come origine di eventi, Lambda deve accedere all'Amazon VPC in cui risiede il cluster. Ti consigliamo di implementare gli endpoint AWS PrivateLink VPC per Lambda per accedere al tuo VPC. Distribuisci endpoint per Lambda e (). AWS Security Token Service AWS STS Se il broker utilizza l'autenticazione, implementa anche un endpoint VPC per Secrets Manager. Se hai configurato una destinazione in caso di errore, implementa anche un endpoint VPC per il servizio di destinazione.

In alternativa, verifica che il VPC associato al cluster Kafka includa un gateway NAT per sottorete pubblica. Per ulteriori informazioni, consulta Abilita l'accesso a Internet per le funzioni Lambda connesse a VPC.

Se utilizzi endpoint VPC, devi anche configurarli in modo da abilitare i nomi DNS privati.

Quando crei una mappatura dell'origine degli eventi per un cluster Apache Kafka autogestito, Lambda verifica se le interfacce di rete elastiche (ENI) sono già presenti per le sottoreti e i gruppi di sicurezza del VPC del cluster. Se Lambda trova ENI esistenti, tenta di riutilizzarli. Altrimenti, Lambda crea nuovi ENI per connettersi all'origine dell'evento e richiamare la tua funzione.

Nota

Le funzioni Lambda vengono sempre eseguite all'interno di VPC di proprietà del servizio Lambda. Questi VPC vengono gestiti automaticamente dal servizio e non sono visibili ai clienti. Puoi anche connettere la tua funzione a un Amazon VPC. In entrambi i casi, la configurazione VPC della funzione non influisce sulla mappatura delle sorgenti degli eventi. Solo la configurazione del VPC dell'origine dell'evento determina il modo in cui Lambda si connette alla fonte dell'evento.

Per ulteriori informazioni sulla configurazione della rete, consulta Configurazione AWS Lambda con un cluster Apache Kafka all'interno di un VPC sul blog di Compute. AWS

Regole del gruppo di sicurezza VPC

Configura i gruppi di sicurezza per l'Amazon VPC contenente il tuo cluster con le seguenti regole (come minimo):

  • Regole in entrata – Consenti tutto il traffico sulla porta del broker Kafka per i gruppi di sicurezza specificati per l'origine eventi. Kafka utilizza la porta 9092 per impostazione predefinita.

  • Regole in uscita: consenti tutto il traffico sulla porta 443 per tutte le destinazioni. Consenti tutto il traffico sulla porta del broker Kafka per i gruppi di sicurezza specificati per l'origine eventi. Kafka utilizza la porta 9092 per impostazione predefinita.

  • Se si utilizzano endpoint VPC anziché gateway NAT, i gruppi di sicurezza associati agli endpoint VPC devono consentire tutto il traffico in entrata sulla porta 443 dai gruppi di sicurezza dell'origine eventi.

Uso di endpoint VPC

Quando utilizzi gli endpoint VPC, le chiamate API per richiamare la tua funzione vengono instradate attraverso questi endpoint utilizzando gli ENI. Il responsabile del servizio Lambda deve chiamare sts:AssumeRole e lambda:InvokeFunction attivare tutti i ruoli e le funzioni che utilizzano tali ENI.

Per impostazione predefinita, gli endpoint VPC dispongono di policy IAM aperte. La migliore pratica consiste nel limitare queste policy per consentire solo a soggetti specifici di eseguire le azioni necessarie utilizzando quell'endpoint. Per garantire che la mappatura delle sorgenti degli eventi sia in grado di richiamare la funzione Lambda, la policy degli endpoint VPC deve consentire al principio del servizio Lambda di chiamare e. sts:AssumeRole lambda:InvokeFunction La limitazione delle policy degli endpoint VPC per consentire solo le chiamate API provenienti dall'organizzazione impedisce il corretto funzionamento della mappatura delle sorgenti degli eventi.

Il seguente esempio di policy degli endpoint VPC mostra come concedere l'accesso richiesto al principale del servizio Lambda per gli endpoint Lambda e Lambda. AWS STS

Esempio Politica degli endpoint VPC - endpoint AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Esempio Politica degli endpoint VPC - Endpoint Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Se il tuo broker Kafka utilizza l'autenticazione, puoi anche limitare la policy degli endpoint VPC per l'endpoint Secrets Manager. Per chiamare l'API Secrets Manager, Lambda utilizza il ruolo della funzione, non il responsabile del servizio Lambda. L'esempio seguente mostra una policy per gli endpoint di Secrets Manager.

Esempio Politica degli endpoint VPC - Endpoint Secrets Manager
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Se hai configurato una destinazione in caso di errore, Lambda utilizza anche il ruolo della tua funzione per chiamare una delle s3:PutObject due sqs:sendMessage o utilizzare gli sns:Publish ENI gestiti da Lambda.

Aggiunta di un cluster Kafka come origine eventi

Per creare una mappatura dell'origine eventi, aggiungi il cluster Kafka come trigger della funzione Lambda utilizzando la console Lambda, un SDK AWS o AWS Command Line Interface (AWS CLI).

Questa sezione descrive come creare una mappatura dell'origine eventi utilizzando la console Lambda e AWS CLI.

Prerequisiti

  • Un cluster Apache Kafka autogestito. Lambda supporta la versione 0.10.1.0 e successive di Apache Kafka.

  • Un ruolo di esecuzione con autorizzazione ad accedere alle AWS risorse utilizzate dal cluster Kafka autogestito.

ID gruppo di consumer personalizzabile

Quando configuri Kafka come origine eventi, puoi specificare un ID gruppo di consumer. Questo ID gruppo di consumer è un identificatore esistente per il gruppo di consumer Kafka a cui desideri che la tua funzione Lambda aderisca. Puoi utilizzare questa funzione per migrare senza problemi qualsiasi configurazione di elaborazione dei record Kafka in corso da altri utenti a Lambda.

Se specifichi l'ID gruppo di consumer e sono presenti altri sondaggi attivi all'interno di quel gruppo di consumer, Kafka distribuisce i messaggi a tutti i consumer. In altre parole, Lambda non riceve tutti i messaggi relativi all'argomento Kafka. Se desideri che Lambda gestisca tutti i messaggi dell'argomento, disattiva tutti gli altri sondaggi in quel gruppo di consumer.

Inoltre, se specifichi un ID gruppo di consumer e Kafka trova un gruppo di consumer esistente valido con lo stesso ID, Lambda ignora il parametro StartingPosition per la mappatura dell'origine eventi. Inizia invece ad elaborare i record in base alla compensazione impegnata del gruppo di consumer. Se specifichi un ID gruppo di consumer e Kafka non riesce a trovare un gruppo di consumer esistente, Lambda configura l'origine eventi con la StartingPosition specificata.

L'ID gruppo di consumer deve essere univoco tra tutte le origini eventi Kafka. Dopo aver creato una mappatura dell'origine eventi Kafka con l'ID del gruppo di consumer specificato, non sarà più possibile aggiornare questo valore.

Aggiunta di un cluster Kafka autogestito (console)

Segui questi passaggi per aggiungere il cluster Apache Kafka autogestito e un argomento Kafka come trigger per la funzione Lambda.

Per aggiungere un trigger Apache Kafka alla funzione Lambda (console)
  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliere il nome della funzione Lambda.

  3. In Panoramica delle funzioni, scegliere Aggiungi trigger.

  4. In Configurazione trigger, effettua le operazioni seguenti:

    1. Scegliere il tipo di trigger Apache Kafka.

    2. Per Server di bootstrap, inserisci l'indirizzo composto dalla coppia host e porta di un broker Kafka nel cluster, quindi scegli Aggiungi. Ripeti la procedura per ogni broker Kafka del cluster.

    3. Per Nome argomento, inserisci il nome dell'argomento Kafka utilizzato per memorizzare i record nel cluster.

    4. (Facoltativo) In Dimensioni batch, inserisci il numero massimo di record da ricevere in un singolo batch.

    5. Per Finestra batch, immetti il tempo massimo in secondi per la raccolta dei registri da parte di Lambda prima di richiamare la funzione.

    6. (Facoltativo) Per ID gruppo di consumer, inserisci l'ID di un gruppo di consumer Kafka a cui aderire.

    7. (Facoltativo) Per Posizione di inizio, scegli Più recente per iniziare a leggere il flusso dal record più recente, Orizzonte di taglio per iniziare dal primo record disponibile o In corrispondenza del timestamp per specificare un timestamp da cui iniziare la lettura.

    8. (Facoltativo) Per VPC, scegli Amazon VPC per il cluster Kafka. Quindi, scegli Sottoreti VPC e Gruppi di sicurezza VPC.

      Questa impostazione è necessaria soltanto se gli utenti del VPC accedono ai broker.

    9. (Facoltativo) Per Autenticazione, scegli Aggiungi e quindi esegui le seguenti operazioni:

      1. Scegli il protocollo di accesso o di autenticazione dei broker Kafka del cluster.

        • Se il broker Kafka utilizza l'autenticazione SASL/PLAIN, scegli BASIC_AUTH.

        • Se il broker utilizza l'autenticazione SASL/SCRAM, scegli uno dei protocolli SASL_SCRAM.

        • Se stai configurando l'autenticazione mTLS, scegli il protocollo CLIENT_CERTIFICATE_TLS_AUTH.

      2. Per l'autenticazione SASL/SCRAM o mTLS, scegli la chiave segreta di Secrets Manager che contiene le credenziali per il cluster Kafka.

    10. (Facoltativo) Per Crittografia, scegli il segreto di Secrets Manager contenente il certificato CA root utilizzato dai broker Kafka per la crittografia TLS, se i broker Kafka utilizzano certificati firmati da una CA privata.

      Questa impostazione si applica alla crittografia TLS per SASL/SCRAM o SASL/PLAIN e all'autenticazione MTLS.

    11. Per creare il trigger in uno stato disabilitato per il test (scelta consigliata), deselezionare Abilita trigger. Oppure, per attivare immediatamente il trigger, selezionareAbilita trigger.

  5. Per creare il trigger, scegli Aggiungi.

Aggiunta di un cluster Kafka autogestito (AWS CLI)

Usa i seguenti AWS CLI comandi di esempio per creare e visualizzare un trigger Apache Kafka autogestito per la tua funzione Lambda.

Utilizzo di SASL/SCRAM

Se gli utenti Kafka accedono ai broker Kafka tramite Internet, è necessario specificare il segreto di Secrets Manager creato per l'autenticazione SASL/SCRAM. L'esempio seguente utilizza il create-event-source-mapping AWS CLI comando per mappare una funzione Lambda denominata my-kafka-function a un argomento di Kafka denominato. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Utilizzo di un VPC

Se solo gli utenti Kafka all'interno del proprio VPC accedono ai broker Kafka, dovrai specificare VPC, sottorete e gruppo di sicurezza del VPC. L'esempio seguente utilizza il create-event-source-mapping AWS CLI comando per mappare una funzione Lambda denominata my-kafka-function a un argomento di Kafka denominato. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Visualizzazione dello stato utilizzando il AWS CLI

L'esempio seguente utilizza il get-event-source-mapping AWS CLI comando per descrivere lo stato della mappatura dell'origine degli eventi creata.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Destinazioni in caso di errore

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un documento JSON con metadati sulla chiamata non riuscita. Puoi configurare qualsiasi argomento Amazon SNS, coda Amazon SQS o bucket S3 come destinazione. Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:

Inoltre, se hai configurato una chiave KMS sulla destinazione, Lambda necessita delle seguenti autorizzazioni, a seconda del tipo di destinazione:

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliere una funzione.

  3. In Function overview (Panoramica delle funzioni), scegliere Add destination (Aggiungi destinazione).

  4. Per Origine, scegli Chiamata allo strumento di mappatura dell'origine degli eventi.

  5. Per Strumento di mappatura dell'origine degli eventi, scegli un'origine dell'evento configurata per questa funzione.

  6. Per Condizione, seleziona In caso di errore. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

  7. Per Tipo di destinazione, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

  8. Per Destination (Destinazione), scegliere una risorsa.

  9. Scegliere Save (Salva).

Puoi anche configurare una destinazione AWS CLI in caso di errore utilizzando. Ad esempio, il seguente comando create-event-source-mapping aggiunge una mappatura dell'origine degli eventi con una destinazione SQS in caso di errore a: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Il seguente comando update-event-source-mapping aggiunge una destinazione S3 in caso di errore all'origine dell'evento associata all'input: uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Record di invocazione di esempio SNS e SQS

L'esempio seguente mostra il contenuto che Lambda invia a un argomento SNS o una coda SQS di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Ciascuna delle chiavi in recordsInfo contiene sia l'argomento sia la partizione di Kafka, separati da un trattino. Ad esempio, per la chiave "Topic-0", Topic è l'argomento di Kafka e 0 è la partizione. Per ogni argomento e partizione, è possibile utilizzare i dati di offset e timestamp per individuare i record di chiamata originali.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Record di invocazione di esempio di destinazione S3

Se le destinazioni sono S3, Lambda invia alla destinazione l'intero record di chiamata insieme ai metadati. L'esempio seguente mostra ciò che Lambda invia a un bucket S3 di destinazione per una chiamata non riuscita all'origine dell'evento Kafka. Oltre a tutti i campi dell'esempio precedente per le destinazioni SQS e SNS, il campo payload contiene il record di chiamata originale come stringa JSON con escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Suggerimento

Ti consigliamo di abilitare il controllo delle versioni S3 sul bucket di destinazione.

Aggiunta di un cluster Kafka come origine eventi

Quando aggiungi il cluster Apache Kafka come trigger per la funzione Lambda, il cluster viene utilizzato come origine eventi.

Lambda legge i dati degli eventi dagli argomenti di Kafka specificati Topics in una CreateEventSourceMappingrichiesta, in base a ciò che specifichi. StartingPosition Dopo che l'elaborazione è avvenuta con successo, l'argomento Kafka viene salvato nel cluster Kafka.

Se specifichi StartingPosition come LATEST, Lambda inizia a leggere a partire dall'ultimo messaggio in ogni partizione appartenente all'argomento. Poiché ci può essere un certo ritardo dopo la configurazione del trigger prima che Lambda inizi a leggere i messaggi, Lambda non legge alcun messaggio prodotto durante questo periodo.

Lambda elabora i registri da una o più partizioni dell'argomento Kafka specificate e invia un payload JSON alla funzione. Quando sono disponibili più record, Lambda continua a elaborare i record in batch, in base al BatchSize valore specificato in una CreateEventSourceMappingrichiesta, finché la funzione non raggiunge l'argomento.

Se la funzione restituisce un errore per uno qualunque dei messaggi in un batch, Lambda ritenta l'intero batch di messaggi fino a quando l'elaborazione riesce o i messaggi scadono. È possibile inviare i record che non superano tutti i tentativi di riprova a una destinazione in caso di errore per un'elaborazione successiva.

Nota

Anche se le funzioni Lambda generalmente prevedono un timeout massimo di 15 minuti, gli strumenti di mappatura dell'origine degli eventi per Amazon MSK, Apache Kafka autogestito, Amazon DocumentDB e Amazon MQ per ActiveMQ e RabbitMQ supportano solo funzioni con timeout massimi di 14 minuti. Questa limitazione garantisce che lo strumento di mappatura dell'origine degli eventi possa gestire correttamente errori di funzioni e nuovi tentativi.

Posizioni di partenza di polling e flussi

Tieni presente che il polling dei flussi durante la creazione e gli aggiornamenti dello strumento di mappatura dell’origine degli eventi alla fine è coerente.

  • Durante la creazione dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

  • Durante gli aggiornamenti dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

Questo comportamento implica che se specifichi LATEST come posizione iniziale del flusso, lo strumento di mappatura dell'origine degli eventi potrebbe perdere eventi durante la creazione o gli aggiornamenti. Per non perdere alcun evento, specifica la posizione iniziale del flusso come TRIM_HORIZON o AT_TIMESTAMP.

Scalabilità automatica dell'origine eventi Kafka

Quando si crea inizialmente un'origine eventi Apache Kafka, Lambda assegna un consumatore per elaborare tutte le partizioni dell'argomento Kafka. Ogni consumatore ha più processori in esecuzione in parallelo per gestire carichi di lavoro più elevati. Inoltre, Lambda aumenta o diminuisce automaticamente il numero di consumatori in base al carico di lavoro. Per preservare l'ordinamento dei messaggi in ogni partizione, il numero massimo di consumatori è un consumatore per ogni partizione dell'argomento.

Ogni minuto, Lambda valuta il ritardo dell'offset del consumatore di tutte le partizioni dell'argomento. Se il ritardo è troppo alto, la partizione sta ricevendo messaggi più velocemente di quanto Lambda possa elaborarli. Se necessario, Lambda aggiunge o rimuove i consumer dall'argomento. Il processo di dimensionamento di aggiunta o rimozione dei consumatori avviene entro tre minuti dalla valutazione.

Se la funzione Lambda di destinazione è sovraccarica, Lambda riduce il numero di consumer. Questa operazione riduce il carico di lavoro sulla funzione riducendo il numero di messaggi che i consumer possono recuperare e inviare alla funzione.

Per monitorare il throughput del proprio argomento Kafka, è possibile visualizzare i parametri dei consumer Apache Kafka, come consumer_lag e consumer_offset. Per controllare quante invocazioni di funzioni si verificano in parallelo, è inoltre possibile monitorare i parametri di concorrenza per la funzione.

Errori della mappatura dell'origine eventi

Quando aggiungi il cluster Apache Kafka come origine eventi per la funzione Lambda, se la funzione rileva un errore, il consumer Kafka arresta l'elaborazione dei record. I consumatori di una partizione dell'argomento sono quelli che sottoscrivono, leggono ed elaborano i record. Gli altri consumatori Kafka possono continuare a elaborare i record, a condizione che non riscontrino lo stesso errore.

Per determinare la causa di un consumatore interrotto, controlla il campo StateTransitionReason nella risposta di EventSourceMapping. Nell'elenco seguente vengono descritti gli errori dell'origine eventi che è possibile ricevere:

ESM_CONFIG_NOT_VALID

La configurazione della mappatura della fonte evento non è valida.

EVENT_SOURCE_AUTHN_ERROR

Lambda non ha potuto autenticare la fonte evento.

EVENT_SOURCE_AUTHZ_ERROR

Lambda non dispone delle autorizzazioni necessarie per accedere alla fonte evento.

FUNCTION_CONFIG_NOT_VALID

La configurazione della funzione non è valida.

Nota

Se i record degli eventi Lambda superano il limite di dimensione consentito di 6 MB, potrebbero non venire elaborati.

CloudWatch Metriche Amazon

Lambda emette il parametro OffsetLag mentre la funzione elabora i registri. Il valore di questo parametro è la differenza di offset tra l'ultimo registro scritto nell'argomento dell'origine eventi Kafka e l'ultimo registro elaborato da Lambda. Puoi utilizzare OffsetLag per stimare la latenza tra il momento in cui un registro viene aggiunto e il momento in cui il gruppo di consumer lo elabora.

Una tendenza in aumento in OffsetLag può indicare problemi con i sondaggi nel gruppo di consumer della funzione. Per ulteriori informazioni, consulta Utilizzo dei parametri delle funzioni Lambda.

Parametri di configurazione Apache Kafka gestiti dal cliente

Tutti i tipi di sorgenti di eventi Lambda condividono le stesse operazioni CreateEventSourceMappinge quelle dell'UpdateEventSourceMappingAPI. Tuttavia, solo alcuni dei parametri si applicano ad Apache Kafka.

Parametri di origine eventi che si applicano ad Apache Kafka autogestito
Parametro Obbligatorio Predefinito Note

BatchSize

N

100

Massimo: 10.000

Abilitato

N

Abilitato

FunctionName

Y

FilterCriteria

N

Filtro eventi Lambda

MaximumBatchingWindowInSecondi

N

500 ms

Comportamento di batching

SelfManagedEventSource

Y

Elenco dei broker Kafka. Può essere impostato solo su Create

SelfManagedKafkaEventSourceConfig

N

Contiene il ConsumerGroupId campo che per impostazione predefinita è un valore univoco.

Può essere impostato solo su Create

SourceAccessConfigurazioni

N

Nessuna credenziale

Informazioni sul VPC o credenziali di autenticazione per il cluster

Per SASL_PLAIN, impostare su BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON o LATEST

Può essere impostato solo su Create

StartingPositionTimestamp

N

Obbligatorio se StartingPosition è impostato su AT_TIMESTAMP

Argomenti

Y

Nome argomento

Può essere impostato solo su Create