Usar a filtragem de eventos com uma origem de eventos do Kinesis - AWS Lambda

Usar a filtragem de eventos com uma origem de eventos do Kinesis

É possível usar filtragem de eventos para controlar quais registros de um stream ou fila que o Lambda enviará para a função. Para obter informações gerais sobre como a filtragem de eventos funciona, consulte Controlar quais eventos o Lambda envia para a função.

Esta seção tem como foco a filtragem de eventos para as origens de eventos do Kinesis.

Conceitos básicos da filtragem de eventos do Kinesis

Suponha que um produtor esteja inserindo dados formatados em JSON em seu fluxo de dados do Kinesis. Um exemplo de registro seria semelhante ao a seguir, com os dados JSON convertidos em uma string codificada em Base64 no campo data.

{ "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }

Desde que os dados que o produtor coloque no stream sejam JSON válido, é possível usar a filtragem de eventos para filtrar registros usando a chave data. Suponha que um produtor esteja inserindo dados em seu stream do Kinesis no formato JASON a seguir.

{ "record": 12345, "order": { "type": "buy", "stock": "ANYCO", "quantity": 1000 } }

Para filtrar somente os registros em que o tipo de pedido é “comprar”, o objeto FilterCriteria seria como a seguir.

{ "Filters": [ { "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }" } ] }

Para maior clareza, aqui está o valor de Pattern do filtro expandido em JSON simples.

{ "data": { "order": { "type": [ "buy" ] } } }

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

Console

Para adicionar esse filtro usando o console, siga as instruções em Anexar critérios de filtro a um mapeamento de fonte de eventos (console) e insira a string a seguir em Critérios do filtro.

{ "data" : { "order" : { "type" : [ "buy" ] } } }
AWS CLI

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
AWS SAM

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

FilterCriteria: Filters: - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'

Para filtrar corretamente eventos de origens do Kinesis, tanto o campo de dados como os critérios de filtro para o campo de dados devem estar em formato JSON válido. Se algum desses campos não estiver em um formato JSON válido, o Lambda descartará a mensagem ou emitirá uma exceção. A tabela a seguir resume o comportamento específico:

Formato dos dados recebidos Formato de filtro padrão para propriedades de dados Ação resultante

JSON válido

JSON válido

Filtros do Lambda com base em seus critérios de filtro.

JSON válido

Nenhum padrão de filtro para propriedades de dados

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

JSON válido

Não JSON

O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.

Não JSON

JSON válido

O Lambda descarta o registro.

Não JSON

Nenhum padrão de filtro para propriedades de dados

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

Não JSON

Não JSON

O Lambda emite uma exceção no momento da criação ou atualização do mapeamento da fonte de eventos. O padrão de filtro para propriedades de dados deve estar em um formato JSON válido.

Filtragem de registros agregados do Kinesis

Com o Kinesis, é possível agregar vários registros em um único registro do Kinesis Data Streams para aumentar seu throughput. O Lambda pode aplicar critérios de filtro a registros agregados somente quando você usar a distribuição avançada do Kinesis. Não há suporte para a filtragem de registros agregados com o Kinesis padrão. Ao usar a distribuição avançada, você configura um consumidor de throughput dedicado do Kinesis para atuar como acionador para sua função do Lambda. Em seguida, o Lambda filtra os registros agregados e passa somente os registros que atendam aos seus critérios de filtragem.

Para saber mais sobre a agregação de registros do Kinesis, consulte a seção Agregação na página Conceitos principais da Kinesis Producer Library (KPL). Para saber mais sobre como usar o Lambda com a distribuição avançada do Kinesis, consulte Aumento do desempenho do processamento de streams em tempo real com a distribuição avançada do Amazon Kinesis Data Streams e o AWS Lambda no blog de computação da AWS.