Usar a filtragem de eventos com uma origem de eventos do Amazon MSK - AWS Lambda

Usar a filtragem de eventos com uma origem de eventos do Amazon MSK

É possível usar filtragem de eventos para controlar quais registros de um stream ou fila que o Lambda enviará para a função. Para obter informações gerais sobre como a filtragem de eventos funciona, consulte Controlar quais eventos o Lambda envia para a função.

Esta seção tem como foco a filtragem de eventos para as origens de eventos do Amazon MSK.

Conceitos básicos da filtragem de eventos do Amazon MSK

Suponha que um produtor esteja escrevendo mensagens para um tópico no cluster do Amazon MSK em um formato válido JSON ou como strings de texto simples. Um exemplo de registro seria semelhante ao a seguir, com a mensagem convertida em uma string codificada em Base64 no campo value.

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

Suponha que o produtor do Apache Kafka esteja escrevendo mensagens para o tópico no formato JSON a seguir.

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

É possível usar a chave value para filtrar registros. Suponha que você queira filtrar somente os registros onde device_ID comece com as letras AB. O objeto FilterCriteria seria como a seguir.

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

Para maior clareza, aqui está o valor de Pattern do filtro expandido em JSON simples.

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

Console

Para adicionar esse filtro usando o console, siga as instruções em Anexar critérios de filtro a um mapeamento de fonte de eventos (console) e insira a string a seguir em Critérios do filtro.

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

Com o Amazon MSK, você também pode filtrar registros quando a mensagem é uma string de texto simples. Suponha que você queira ignorar as mensagens em que a string seja “erro”. O objeto FilterCriteria seria como a seguir.

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

Para maior clareza, aqui está o valor de Pattern do filtro expandido em JSON simples.

{ "value": [ { "anything-but": [ "error" ] } ] }

É possível adicionar seu filtro usando o console, a AWS CLI ou um modelo do AWS SAM.

Console

Para adicionar esse filtro usando o console, siga as instruções em Anexar critérios de filtro a um mapeamento de fonte de eventos (console) e insira a string a seguir em Critérios do filtro.

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

Para criar um novo mapeamento da origem do evento com esses critérios de filtro usando a AWS Command Line Interface (AWS CLI), execute o comando a seguir.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

Para adicionar esses critérios de filtro a um mapeamento da origem do evento existente, execute o comando a seguir.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

Para adicionar esse filtro usando o AWS SAM, adicione o trecho a seguir ao modelo YAML da origem do evento.

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

As mensagens do Amazon MSK devem ser strings codificadas em UTF-8 de texto simples ou no formato JSON. Isso porque o Lambda decodifica as matrizes de bytes do Amazon MSK em UTF-8 antes de aplicar os critérios de filtragem. Se as mensagens usarem outra codificação, como UTF-16 ou ASCII, ou se o formato da mensagem não corresponder ao formato dos FilterCriteria, o Lambda processará somente os filtros de metadados. A tabela a seguir resume o comportamento específico:

Formato do da mensagem recebida Formato padrão de filtro para propriedades de mensagem Ação resultante

String simples

String simples

Filtros do Lambda com base em seus critérios de filtro.

String simples

Nenhum padrão de filtro para propriedades de dados

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

String simples

JSON válido

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

JSON válido

String simples

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

JSON válido

Nenhum padrão de filtro para propriedades de dados

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.

JSON válido

JSON válido

Filtros do Lambda com base em seus critérios de filtro.

String não codificada em UTF-8

JSON, string de texto simples ou nenhum padrão

Filtros do Lambda (somente nas outras propriedades de metadados) com base nos seus critérios de filtro.