Utilizzo del filtraggio degli eventi con una sorgente di eventi Kinesis - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo del filtraggio degli eventi con una sorgente di eventi Kinesis

Puoi utilizzare il filtraggio degli eventi per controllare quali record di un flusso o di una coda Lambda invia alla funzione. Per informazioni generali su come funziona il filtraggio degli eventi, consulta. Controlla quali eventi Lambda invia alla tua funzione

Questa sezione si concentra sul filtraggio degli eventi per le sorgenti di eventi Kinesis.

Nozioni di base sul filtraggio degli eventi Kinesis

Supponiamo che un produttore stia inserendo dati JSON formattati nel flusso di dati Kinesis. Un record di esempio sarebbe simile al seguente, con i JSON dati convertiti in una stringa codificata Base64 nel campo. data

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Finché i dati che il produttore inserisce nello stream sono validiJSON, puoi utilizzare il filtro degli eventi per filtrare i record utilizzando la chiave. data Supponiamo che un produttore stia inserendo dei dischi nello stream Kinesis nel formato seguenteJSON.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Per filtrare solo i record in cui il tipo di ordine è "acquista", l'oggetto FilterCriteria dovrebbe avere la struttura seguente.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Per maggiore chiarezza, ecco il valore del filtro Pattern espanso in chiaro. JSON

{ "data": { "order": { "type": [ "buy" ] } } }

Puoi aggiungere il filtro utilizzando la console AWS CLI o un AWS SAM modello.

Console

Per aggiungere questo filtro utilizzando la console, segui le istruzioni riportate in Collegamento dei criteri di filtro a una mappatura dell'origine evento (console) e inserisci la seguente stringa per i criteri di filtraggio.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Per creare una nuova mappatura dell'origine degli eventi con questi criteri di filtro utilizzando AWS Command Line Interface (AWS CLI), esegui il comando seguente.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Per aggiungere questi criteri di filtraggio a una mappatura dell'origine degli eventi esistente, esegui il comando seguente.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Per aggiungere questo filtro utilizzando AWS SAM, aggiungi il seguente frammento al YAML modello per la fonte dell'evento.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Per filtrare correttamente gli eventi dalle sorgenti Kinesis, sia il campo dati che i criteri di filtro per il campo dati devono essere in formato validoJSON. Se uno dei due campi non è in un JSON formato valido, Lambda elimina il messaggio o genera un'eccezione. La tabella seguente riepiloga il comportamento specifico:

Formato dei dati in entrata Formato del modello di filtro per le proprietà di dati Operazione risultante

Valido JSON

Valido JSON

Filtri Lambda in base ai criteri di filtro.

Valido JSON

Nessun modello di filtro per le proprietà dei dati

Filtri Lambda (solo sulle altre proprietà dei metadati) in base ai criteri di filtro.

Valido JSON

Non- JSON

Lambda genera un'eccezione al momento della creazione o dell'aggiornamento della mappatura dell'origine evento. Il modello di filtro per le proprietà dei dati deve essere in un JSON formato valido.

Non- JSON

Valido JSON

Lambda rilascia il registro.

Non- JSON

Nessun modello di filtro per le proprietà dei dati

Filtri Lambda (solo sulle altre proprietà dei metadati) in base ai criteri di filtro.

Non- JSON

Non- JSON

Lambda genera un'eccezione al momento della creazione o dell'aggiornamento della mappatura dell'origine evento. Il modello di filtro per le proprietà dei dati deve essere in un JSON formato valido.

Filtraggio dei record aggregati Kinesis

Con Kinesis, puoi aggregare più record in un unico record flusso di dati Kinesis per aumentare la velocità di trasmissione effettiva dei dati. Lambda può applicare criteri di filtraggio ai record aggregati solo quando si utilizza il fan-out avanzato di Kinesis. Il filtraggio dei record aggregati con Kinesis standard non è supportato. Quando utilizzi il fan-out avanzato, configuri un consumatore Kinesis a velocità di trasmissione effettiva dedicata che funga da trigger per la funzione Lambda. Lambda filtra quindi i record aggregati e passa solo i record che soddisfano i criteri di filtraggio.

Per saperne di più sull'aggregazione dei record di Kinesis, consulta la sezione Aggregazione della pagina Kinesis Producer Library () Key Concepts. KPL Per ulteriori informazioni sull'utilizzo di Lambda con il fan-out avanzato di Kinesis, consulta Incrementare le prestazioni di elaborazione dei flussi in tempo reale con Amazon Kinesis Data Streams Enhanced fan-out e Lambda sul blog di elaborazione. AWS AWS