Particionamento dinâmico no Kinesis Data Firehose
O particionamento dinâmico permite particionar continuamente os dados em streaming no Kinesis Data Firehose usando chaves dentro dos dados (por exemplocustomer_id
ou transaction_id
) e depois entregar os dados agrupados por essas chaves nos prefixos correspondentes do Amazon Simple Storage Service (Amazon S3). Isso facilita a execução de análises com eficiência de custos e alta performance em dados em streaming no Amazon S3 usando vários serviços, como o Amazon Athena, o Amazon EMR, o Amazon Redshift Spectrum e o Amazon QuickSight. Além disso, o AWS Glue pode realizar trabalhos mais sofisticados de extração, transformação e carregamento (ETL) depois que os dados em streaming particionados dinamicamente são entregues ao Amazon S3, nos casos de uso em que é necessário processamento adicional.
Particionar os dados minimiza a quantidade de dados digitalizados, otimiza a performance e reduz os custos de consultas de análise no Amazon S3. Também aumenta o acesso granular aos dados. Os fluxos de entrega do Kinesis Data Firehose são tradicionalmente usados para capturar e carregar dados no Amazon S3. Para particionar um conjunto de dados em streaming para análises baseadas no Amazon S3, você precisaria executar aplicações de particionamento entre buckets do Amazon S3 antes de disponibilizar os dados para análise, o que pode se tornar complicado ou caro.
Com o particionamento dinâmico, o Kinesis Data Firehose agrupa continuamente os dados em trânsito usando chaves de dados definidas de forma dinâmica ou estática e entrega os dados a prefixos individuais do Amazon S3 por chave. Isso reduz o tempo de obtenção de insights em minutos ou horas. Também reduz os custos e simplifica as arquiteturas.
Tópicos
- Chaves de particionamento
- Prefixo de bucket do Amazon S3 para particionamento dinâmico
- Particionamento dinâmico de dados agregados
- Adicionar um novo delimitador de linha ao entregar dados ao S3
- Como habilitar o particionamento dinâmico
- Tratamento de erros de particionamento dinâmico
- Armazenamento em buffer de dados e particionamento dinâmico
Chaves de particionamento
Com o particionamento dinâmico, você cria conjuntos de dados direcionados a partir dos dados do S3 em streaming particionando os dados com base em chaves de particionamento. As chaves de particionamento permitem que você filtre os dados em streaming com base em valores específicos. Por exemplo, se você precisar filtrar os dados com base no ID do cliente e no país, poderá especificar o campo de dados de customer_id
como uma chave de particionamento e o campo de dados de country
como outra chave de particionamento. Em seguida, você especifica as expressões (usando os formatos compatíveis) para definir os prefixos de bucket do S3 aos quais os registros de dados particionados dinamicamente devem ser entregues.
Estes são os métodos aceitos para criar chaves de particionamento:
-
Análise em linha: esse método usa o mecanismo de suporte integrado do Amazon Kinesis Data Firehose, um analisador jq
, para extrair as chaves para particionamento dos registros de dados que estão no formato JSON. -
Função do AWSLambda: esse método usa uma função do AWS Lambda especificada para extrair e retornar os campos de dados necessários para o particionamento.
Importante
Ao habilitar o particionamento dinâmico, você deve configurar pelo menos um desses métodos para particionar os dados. Você pode configurar qualquer um desses métodos para especificar as chaves de particionamento ou ambos ao mesmo tempo.
Criar chaves de particionamento com análise em linha
Para configurar a análise em linha como o método de particionamento dinâmico para os dados em streaming, você deve escolher os parâmetros de registro de dados a serem usados como chaves de particionamento e fornecer um valor para cada chave de particionamento especificada.
Vamos examinar o seguinte exemplo de registro de dados e ver como você pode definir chaves de particionamento para ele com análise em linha:
{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }
Por exemplo, você pode escolher particionar os dados com base no parâmetro customer_id
ou no parâmetro event_timestamp
. Isso significa que você deseja que o valor do parâmetro customer_id
ou do parâmetro event_timestamp
em cada registro seja usado para determinar o prefixo do S3 ao qual o registro deve ser entregue. Você também pode escolher um parâmetro aninhado, como device
com uma expressão .type.device
. A lógica de particionamento dinâmico pode depender de vários parâmetros.
Depois de selecionar os parâmetros dos dados para as chaves de particionamento, você mapeia cada parâmetro para uma expressão jq válida. A tabela a seguir mostra esse mapeamento de parâmetros para expressões jq:
Parâmetro | Expressão jq |
---|---|
customer_id |
.customer_id |
device |
.type.device |
year |
.event_timestamp| strftime("%Y") |
month |
.event_timestamp| strftime("%m") |
day |
.event_timestamp| strftime("%d") |
hour |
.event_timestamp| strftime("%H") |
No runtime, o Kinesis Data Firehose usa a coluna direita acima para avaliar os parâmetros com base nos dados de cada registro.
Criar chaves de particionamento com uma função do AWS Lambda
Para registros de dados compactados ou criptografados, ou dados que estejam em qualquer formato de arquivo que não seja JSON, você pode usar a função do AWS Lambda integrada com seu próprio código personalizado para descompactar, descriptografar ou transformar os registros a fim de extrair e retornar os campos de dados necessários para o particionamento. Essa é uma expansão da função do Lambda de transformação existente que está disponível atualmente com o Kinesis Data Firehose. Você pode transformar, analisar e retornar os campos de dados que podem ser usados para particionamento dinâmico usando a mesma função do Lambda.
Veja a seguir um exemplo da função do Lambda de processamento de fluxo de entrega do Amazon Kinesis Firehose em Python que reproduz cada registro lido da entrada na saída e extrai as chaves de particionamento dos registros.
from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output
Veja a seguir um exemplo da função do Lambda de processamento de fluxo de entrega do Amazon Kinesis Firehose em Go que reproduz cada registro lido da entrada na saída e extrai as chaves de particionamento dos registros.
package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type KinesisFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.KinesisFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.KinesisFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.KinesisFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.KinesisFirehoseResponseRecordMetadata var recordData KinesisFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }
Prefixo de bucket do Amazon S3 para particionamento dinâmico
Ao criar um fluxo de entrega que usa o Amazon S3 como destino, você deve especificar um bucket do Amazon S3 ao qual o Kinesis Data Firehose deve entregar os dados. Os prefixos de bucket do Amazon S3 são usados para organizar os dados armazenados nos buckets do S3. Um prefixo de bucket do Amazon S3 é semelhante a um diretório que permite agrupar objetos semelhantes.
Com o particionamento dinâmico, os dados particionados são entregues nos prefixos especificados do Amazon S3. Se você não habilitar o particionamento dinâmico, a especificação de um prefixo de bucket do S3 para o fluxo de entrega é opcional. Porém, se você escolher habilitar o particionamento dinâmico, DEVERÁ especificar os prefixos de bucket do S3 para os quais o Kinesis Data Firehose deverá fornecer dados particionados.
Em cada fluxo de entrega em que você habilita o particionamento dinâmico, o valor do prefixo de bucket do S3 consiste em expressões com base nas chaves de particionamento especificadas para esse fluxo de entrega. Usando novamente o exemplo de registro de dados acima, você pode criar o seguinte valor de prefixo do S3 que consiste em expressões com base nas chaves de particionamento definidas acima:
"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }
O Kinesis Data firehose avalia a expressão acima no runtime. Ele agrupa os registros que correspondem à mesma expressão de prefixo S3 avaliada para um único conjunto de dados. Em seguida, o Kinesis Data Firehose entrega cada conjunto de dados ao prefixo S3 avaliado. A frequência de entrega do conjunto de dados para o S3 é determinada pela configuração do buffer de fluxo de entrega. Assim sendo, o registro neste exemplo é entregue à seguinte chave de objeto do S3:
s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa
Para o particionamento dinâmico, você deve usar o seguinte formato de expressão no prefixo de bucket do S3: !{namespace:value}
, em que o namespace pode ser partitionKeyFromQuery
, partitionKeyFromLambda
ou ambos. Se estiver usando análise em linha para criar as chaves de particionamento para os dados da fonte, você deverá especificar um valor de prefixo de bucket do S3 consistindo em expressões especificadas no seguinte formato: "partitionKeyFromQuery:keyID"
. Se estiver usando função do AWS Lambda para criar as chaves de particionamento para os dados da fonte, você deverá especificar um valor de prefixo de bucket de S3 que consista em expressões especificadas no seguinte formato: "partitionKeyFromLambda:keyID"
.
nota
Você também pode especificar o valor do prefixo de bucket do S3 usando o formato no estilo hive, customer_id=!{partitionKeyFromQuery:customer_id}.
Para obter mais informações, consulte "Escolher o Amazon S3 como destino" em Criar de um fluxo de entrega do Amazon Kinesis Data Firehose e Prefixos personalizados para objetos do Amazon S3.
Particionamento dinâmico de dados agregados
Você pode aplicar o particionamento dinâmico aos dados agregados (por exemplo, vários eventos, logs ou registros agregados em uma única chamada de API PutRecord
e PutRecordBatch
), mas esses dados devem primeiro ser desagregados. Você pode desagregar os dados habilitando a desagregação de vários registros, ou seja, o processo de analisar os registros no fluxo de entrega e separá-los. A desagregação de vários registros pode ser do tipo JSON
, o que significa que a separação dos registros é realizada com base no JSON válido. Ou pode ser do tipo Delimited
, o que significa que a separação dos registros é realizada com base em um delimitador personalizado especificado. Esse delimitador personalizado deve ser uma string codificada na base 64. Por exemplo, se quiser usar a string a seguir como seu delimitador personalizado ####
, você deverá especificá-la no formato codificado na base 64, ou seja, IyMjIw==
.
Com dados agregados, quando você habilita o particionamento dinâmico, o Kinesis Data Firehose analisa os registros e procura objetos JSON válidos ou registros delimitados em cada chamada de API com base no tipo de desagregação de vários registros especificado.
Importante
Se os dados forem agregados, o particionamento dinâmico só poderá ser aplicado se os dados primeiro forem desagregados.
Importante
Quando você usa o atributo de transformação de dados no Kinesis Data Firehose, a desagregação é aplicada antes da transformação de dados. Os dados que chegam ao Kinesis Data Firehose são processados na seguinte ordem: Desagregação → Transformação de dados via Lambda → Chaves de particionamento.
Adicionar um novo delimitador de linha ao entregar dados ao S3
Ao habilitar o particionamento dinâmico, você pode configurar o fluxo de entrega para adicionar um novo delimitador de linha entre os registros nos objetos que são entregues ao Amazon S3. Isso pode ser útil para analisar objetos no Amazon S3. Isso também é particularmente útil quando o particionamento dinâmico é aplicado a dados agregados porque a desagregação de vários registros (que deve ser aplicada aos dados agregados antes que possam ser particionados dinamicamente) remove as linhas novas dos registros como parte do processo de análise.
Como habilitar o particionamento dinâmico
Você pode configurar o particionamento dinâmico dos fluxos de entrega por meio do console de gerenciamento, da CLI ou das APIs do Kinesis Data Firehose.
Importante
Você só pode habilitar o particionamento dinâmico ao criar um novo fluxo de entrega. Você não pode habilitar o particionamento dinâmico em um fluxo de entrega existente que não tenha o particionamento dinâmico já habilitado.
Para ver as etapas detalhadas de como habilitar e configurar o particionamento dinâmico usando o console de gerenciamento do Amazon Kinesis Data Firehose ao criar um novo fluxo de entrega, consulte Criar um fluxo de entrega do Amazon Kinesis Data Firehose. Quando for especificar o destino do fluxo de entrega, certifique-se de seguir as etapas na seção Escolher o Amazon S3 como destino, pois atualmente o particionamento dinâmico só é compatível com os fluxos de entrega que usam o Amazon S3 como destino.
Depois que o particionamento dinâmico é habilitado em um fluxo de entrega ativo, você pode atualizar a configuração adicionando, removendo ou atualizando chaves de particionamento e expressões de prefixo do S3. Depois de atualizado, o Amazon Kinesis Data Firehose começa a usar as novas chaves e as novas expressões de prefixo do S3.
Importante
Depois que você habilita o particionamento dinâmico em um fluxo de entrega, ele não pode ser desabilitado nesse fluxo de entrega.
Tratamento de erros de particionamento dinâmico
Se o Kinesis Data Firehose não puder analisar registros de dados no fluxo de entrega ou não conseguir extrair as chaves de particionamento especificadas ou avaliar as expressões incluídas no valor do prefixo do S3, esses registros de dados serão entregues ao prefixo de bucket de erros do S3 que você deve especificar ao criar o fluxo de entrega no qual habilita o particionamento dinâmico. O prefixo de bucket de erros do S3 contém todos os registros que o Kinesis Data Firehose não consegue entregar ao destino especificado do S3. Esses registros são organizados de acordo com o tipo de erro. Junto com o registro, o objeto entregue também inclui informações sobre o erro para ajudar a entender e resolver esse erro.
Você DEVE especificar um prefixo de bucket de erros do S3 para um fluxo de entrega se quiser habilitar o particionamento dinâmico para esse fluxo de entrega. Se você não quiser habilitar o particionamento dinâmico para um fluxo de entrega, a especificação de um prefixo de bucket de erros do S3 é opcional.
Armazenamento em buffer de dados e particionamento dinâmico
O Amazon Kinesis Data Firehose armazena em buffer os dados em streaming recebidos até um determinado tamanho e por um determinado período antes de entregá-los aos destinos especificados. Você pode configurar o tamanho do buffer e o intervalo do buffer ao criar novos fluxos de entrega ou atualizar o tamanho do buffer e o intervalo do buffer nos fluxos de entrega existentes. O tamanho do buffer é medido em MBs e o intervalo do buffer é medido em segundos.
Quando o particionamento dinâmico está habilitado, o Kinesis Data Firehose armazena internamente os registros que pertencem a uma determinada partição com base na sugestão de buffer configurada (tamanho e tempo) antes de entregar esses registros ao bucket do Amazon S3. Para entregar objetos com o tamanho máximo, o Kinesis Data Firehose usa o armazenamento em buffer de vários estágios internamente. Portanto, o atraso de ponta a ponta de um lote de registros pode ser 1,5 vezes o tempo de sugestão de buffer configurado. Isso afeta a atualização dos dados de um fluxo de entrega.
A quantidade de partições ativas é o número total de partições ativas dentro do buffer de entrega. Por exemplo, se a consulta de particionamento dinâmico monta 3 partições por segundo e você tiver uma configuração de sugestão de buffer que aciona a entrega a cada 60 segundos, então, em média, você teria 180 partições ativas. Se o Kinesis Data Firehose não puder entregar os dados em uma partição para um destino, essa partição será contada como ativa no buffer de entrega até que possa ser entregue.
Uma nova partição é criada quando um prefixo do S3 é avaliado como um novo valor com base nos campos de dados do registro e nas expressões do prefixo do S3. Um novo buffer é criado para cada partição ativa. Cada registro subsequente com o mesmo prefixo S3 avaliado é entregue a esse buffer. Quando o buffer chega ao seu limite de tamanho ou ao fim de seu intervalo de tempo, o Amazon Kinesis Data Firehose cria um objeto com os dados do buffer e o entrega ao prefixo especificado do Amazon S3. Depois que o objeto é entregue, o buffer da partição e a própria partição são excluídos e removidos da contagem de partições ativas. O Amazon Kinesis Data Firehose entrega cada dado do buffer como um único objeto quando o tamanho ou o intervalo do buffer são atingidos para cada partição separadamente. Quando o número de partições ativas atinge o limite de 500 por fluxo de entrega, o restante dos registros no fluxo de entrega é entregue ao prefixo especificado do bucket de erros do S3.