Particionamento dinâmico no Amazon Data Firehose - Amazon Data Firehose

O Amazon Data Firehose era conhecido anteriormente como Amazon Kinesis Data Firehose

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Particionamento dinâmico no Amazon Data Firehose

O particionamento dinâmico permite particionar continuamente dados de streaming no Firehose usando chaves dentro dos dados (por exemplo, customer_id outransaction_id) e depois entregar os dados agrupados por essas chaves nos prefixos correspondentes do Amazon Simple Storage Service (Amazon S3). Isso facilita a execução de análises econômicas e de alto desempenho em dados de streaming no Amazon S3 usando vários serviços, como Amazon Athena, Amazon EMR, Amazon Redshift Spectrum e Amazon. QuickSight Além disso, o AWS Glue pode realizar trabalhos mais sofisticados de extração, transformação e carregamento (ETL) depois que os dados de streaming particionados dinamicamente são entregues ao Amazon S3, em casos de uso em que é necessário processamento adicional.

Particionar os dados minimiza a quantidade de dados digitalizados, otimiza a performance e reduz os custos de consultas de análise no Amazon S3. Também aumenta o acesso granular aos dados. Os streams Firehose são tradicionalmente usados para capturar e carregar dados no Amazon S3. Para particionar um conjunto de dados em streaming para análises baseadas no Amazon S3, você precisaria executar aplicações de particionamento entre buckets do Amazon S3 antes de disponibilizar os dados para análise, o que pode se tornar complicado ou caro.

Com o particionamento dinâmico, o Firehose agrupa continuamente dados em trânsito usando chaves de dados definidas de forma dinâmica ou estática e entrega os dados para prefixos individuais do Amazon S3 por chave. Isso reduz time-to-insight em minutos ou horas. Também reduz os custos e simplifica as arquiteturas.

Chaves de particionamento

Com o particionamento dinâmico, você cria conjuntos de dados direcionados a partir dos dados do S3 em streaming particionando os dados com base em chaves de particionamento. As chaves de particionamento permitem que você filtre os dados em streaming com base em valores específicos. Por exemplo, se você precisar filtrar os dados com base no ID do cliente e no país, poderá especificar o campo de dados de customer_id como uma chave de particionamento e o campo de dados de country como outra chave de particionamento. Em seguida, você especifica as expressões (usando os formatos compatíveis) para definir os prefixos de bucket do S3 aos quais os registros de dados particionados dinamicamente devem ser entregues.

Estes são os métodos aceitos para criar chaves de particionamento:

  • Análise embutida - esse método usa o mecanismo de suporte integrado do Firehose, um analisador jq, para extrair as chaves para particionamento de registros de dados que estão no formato JSON. Atualmente, oferecemos suporte apenas à jq 1.6 versão.

  • AWS Função Lambda - esse método usa uma função AWS Lambda especificada para extrair e retornar os campos de dados necessários para o particionamento.

Importante

Ao habilitar o particionamento dinâmico, você deve configurar pelo menos um desses métodos para particionar os dados. Você pode configurar qualquer um desses métodos para especificar as chaves de particionamento ou ambos ao mesmo tempo.

Criar chaves de particionamento com análise em linha

Para configurar a análise em linha como o método de particionamento dinâmico para os dados em streaming, você deve escolher os parâmetros de registro de dados a serem usados como chaves de particionamento e fornecer um valor para cada chave de particionamento especificada.

O exemplo de registro de dados a seguir mostra como você pode definir chaves de particionamento para ele com análise embutida. Observe que os dados devem ser codificados no formato Base64. Você também pode consultar o exemplo da CLI.

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

Por exemplo, você pode escolher particionar os dados com base no parâmetro customer_id ou no parâmetro event_timestamp. Isso significa que você deseja que o valor do parâmetro customer_id ou do parâmetro event_timestamp em cada registro seja usado para determinar o prefixo do S3 ao qual o registro deve ser entregue. Você também pode escolher um parâmetro aninhado, como device com uma expressão .type.device. A lógica de particionamento dinâmico pode depender de vários parâmetros.

Depois de selecionar os parâmetros dos dados para as chaves de particionamento, você mapeia cada parâmetro para uma expressão jq válida. A tabela a seguir mostra esse mapeamento de parâmetros para expressões jq:

Parâmetro Expressão jq
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

Em tempo de execução, o Firehose usa a coluna direita acima para avaliar os parâmetros com base nos dados de cada registro.

Criar chaves de particionamento com uma função do AWS Lambda

Para registros de dados compactados ou criptografados, ou dados que estejam em qualquer formato de arquivo que não seja JSON, você pode usar a função AWS Lambda integrada com seu próprio código personalizado para descompactar, descriptografar ou transformar os registros a fim de extrair e retornar os campos de dados necessários para o particionamento. Essa é uma expansão da função Lambda de transformação existente que está disponível atualmente com o Firehose. Você pode transformar, analisar e retornar os campos de dados que podem ser usados para particionamento dinâmico usando a mesma função do Lambda.

Veja a seguir um exemplo da função Lambda de processamento de stream do Firehose em Python que reproduz cada registro lido da entrada à saída e extrai as chaves de particionamento dos registros.

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

Veja a seguir um exemplo da função Lambda de processamento de stream do Firehose em Go que reproduz cada registro lido da entrada à saída e extrai as chaves de particionamento dos registros.

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

Prefixo de bucket do Amazon S3 para particionamento dinâmico

Ao criar um stream do Firehose que usa o Amazon S3 como destino, você deve especificar um bucket do Amazon S3 onde o Firehose deve entregar seus dados. Os prefixos de bucket do Amazon S3 são usados para organizar os dados armazenados nos buckets do S3. Um prefixo de bucket do Amazon S3 é semelhante a um diretório que permite agrupar objetos semelhantes.

Com o particionamento dinâmico, os dados particionados são entregues nos prefixos especificados do Amazon S3. Se você não habilitar o particionamento dinâmico, especificar um prefixo de bucket do S3 para seu stream do Firehose é opcional. No entanto, se você optar por ativar o particionamento dinâmico, deverá especificar os prefixos de bucket do S3 para os quais o Firehose entrega dados particionados.

Em cada stream do Firehose em que você ativa o particionamento dinâmico, o valor do prefixo do bucket do S3 consiste em expressões com base nas chaves de particionamento especificadas para esse stream do Firehose. Usando novamente o exemplo de registro de dados acima, você pode criar o seguinte valor de prefixo do S3 que consiste em expressões com base nas chaves de particionamento definidas acima:

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

O Firehose avalia a expressão acima em tempo de execução. Ele agrupa os registros que correspondem à mesma expressão de prefixo S3 avaliada para um único conjunto de dados. O Firehose então entrega cada conjunto de dados ao prefixo S3 avaliado. A frequência de entrega do conjunto de dados para o S3 é determinada pela configuração do buffer de fluxo do Firehose. Assim sendo, o registro neste exemplo é entregue à seguinte chave de objeto do S3:

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

Para o particionamento dinâmico, você deve usar o seguinte formato de expressão no prefixo de bucket do S3: !{namespace:value}, em que o namespace pode ser partitionKeyFromQuery, partitionKeyFromLambda ou ambos. Se estiver usando análise em linha para criar as chaves de particionamento para os dados da fonte, você deverá especificar um valor de prefixo de bucket do S3 consistindo em expressões especificadas no seguinte formato: "partitionKeyFromQuery:keyID". Se estiver usando função do AWS Lambda para criar as chaves de particionamento para os dados da fonte, você deverá especificar um valor de prefixo de bucket de S3 que consista em expressões especificadas no seguinte formato: "partitionKeyFromLambda:keyID".

nota

Você também pode especificar o valor do prefixo do bucket do S3 usando o formato de estilo hive, por exemplo customer_id=! {partitionKeyFromConsulta: customer_ID}.

Para obter mais informações, consulte “Escolha o Amazon S3 para seu destino” em Criação de um stream do Amazon Firehose e prefixos personalizados para objetos do Amazon S3.

Particionamento dinâmico de dados agregados

Você pode aplicar o particionamento dinâmico aos dados agregados (por exemplo, vários eventos, logs ou registros agregados em uma única chamada de API PutRecord e PutRecordBatch), mas esses dados devem primeiro ser desagregados. Você pode desagregar seus dados ativando a desagregação de vários registros, o processo de analisar os registros no stream do Firehose e separá-los.

A desagregação de vários registros pode ser do JSON tipo, o que significa que a separação dos registros é baseada em objetos JSON consecutivos. A desagregação também pode ser desse tipoDelimited, o que significa que a separação dos registros é realizada com base em um delimitador personalizado especificado. Esse delimitador personalizado deve ser uma string codificada na base 64. Por exemplo, se quiser usar a sequência de caracteres a seguir como seu delimitador personalizado####, você deve especificá-la no formato codificado em base 64, que a traduz para. IyMjIw==

nota

Ao desagregar registros JSON, certifique-se de que sua entrada ainda seja apresentada no formato JSON compatível. Os objetos JSON devem estar em uma única linha sem delimitador ou somente delimitados por nova linha (JSONL). Uma matriz de objetos JSON não é uma entrada válida.

Estes são exemplos de entrada correta: {"a":1}{"a":2} and {"a":1}\n{"a":2}

Este é um exemplo da entrada incorreta: [{"a":1}, {"a":2}]

Com dados agregados, quando você ativa o particionamento dinâmico, o Firehose analisa os registros e procura objetos JSON válidos ou registros delimitados em cada chamada de API com base no tipo de desagregação de vários registros especificado.

Importante

Se os dados forem agregados, o particionamento dinâmico só poderá ser aplicado se os dados primeiro forem desagregados.

Importante

Quando você usa o recurso de transformação de dados no Firehose, a desagregação será aplicada antes da transformação de dados. Os dados que chegam ao Firehose serão processados na seguinte ordem: Desagregação → Transformação de dados via Lambda → Chaves de particionamento.

Adicionar um novo delimitador de linha ao entregar dados ao S3

Você pode ativar o New Line Delimiter para adicionar um novo delimitador de linha entre registros em objetos que são entregues ao Amazon S3. Isso pode ser útil para analisar objetos no Amazon S3. Isso também é particularmente útil quando o particionamento dinâmico é aplicado a dados agregados porque a desagregação de vários registros (que deve ser aplicada aos dados agregados antes que possam ser particionados dinamicamente) remove novas linhas dos registros como parte do processo de análise.

Como habilitar o particionamento dinâmico

Você pode configurar o particionamento dinâmico para seus streams do Firehose por meio do Amazon Data Firehose Management Console, da CLI ou das APIs.

Importante

Você pode ativar o particionamento dinâmico somente ao criar um novo stream do Firehose. Você não pode ativar o particionamento dinâmico para um stream existente do Firehose que não tenha o particionamento dinâmico já ativado.

Para obter etapas detalhadas sobre como habilitar e configurar o particionamento dinâmico por meio do console de gerenciamento do Firehose ao criar um novo stream do Firehose, consulte Criação de um stream do Amazon Firehose. Ao concluir a tarefa de especificar o destino do seu stream do Firehose, certifique-se de seguir as etapas na seção Escolha o Amazon S3 para seu destino, pois atualmente, o particionamento dinâmico só é suportado para streams do Firehose que usam o Amazon S3 como destino.

Depois que o particionamento dinâmico em um stream ativo do Firehose estiver ativado, você poderá atualizar a configuração adicionando novas chaves de particionamento ou removendo ou atualizando as existentes e as expressões do prefixo S3. Depois de atualizado, o Firehose começa a usar as novas chaves e as novas expressões de prefixo do S3.

Importante

Depois de habilitar o particionamento dinâmico em um stream do Firehose, ele não pode ser desativado nesse stream do Firehose.

Tratamento de erros de particionamento dinâmico

Se o Amazon Data Firehose não conseguir analisar registros de dados em seu stream do Firehose ou não conseguir extrair as chaves de particionamento especificadas ou avaliar as expressões incluídas no valor do prefixo do S3, esses registros de dados serão entregues ao prefixo do bucket de erro do S3 que você deve especificar ao criar o stream do Firehose, no qual você ativa o particionamento dinâmico. O prefixo do bucket de erro do S3 contém todos os registros que o Firehose não consegue entregar ao destino especificado do S3. Esses registros são organizados de acordo com o tipo de erro. Junto com o registro, o objeto entregue também inclui informações sobre o erro para ajudar a entender e resolver esse erro.

Você deve especificar um prefixo de bucket de erro do S3 para um stream do Firehose se quiser ativar o particionamento dinâmico para esse stream do Firehose. Se você não quiser ativar o particionamento dinâmico para um stream do Firehose, especificar um prefixo de bucket de erro do S3 é opcional.

Armazenamento em buffer de dados e particionamento dinâmico

O Amazon Data Firehose armazena os dados de streaming recebidos em um determinado tamanho e por um determinado período de tempo antes de entregá-los aos destinos especificados. Você pode configurar o tamanho do buffer e o intervalo do buffer ao criar novos streams do Firehose ou atualizar o tamanho do buffer e o intervalo do buffer nos streams existentes do Firehose. O tamanho do buffer é medido em MBs e o intervalo do buffer é medido em segundos.

Quando o particionamento dinâmico está habilitado, o Firehose armazena internamente os registros que pertencem a uma determinada partição com base na dica de buffer configurada (tamanho e horário) antes de entregar esses registros ao seu bucket do Amazon S3. Para fornecer objetos de tamanho máximo, o Firehose usa o buffer de vários estágios internamente. Portanto, o end-to-end atraso de um lote de registros pode ser 1,5 vezes o tempo de dica de buffer configurado. Isso afeta a atualização dos dados de um stream do Firehose.

A quantidade de partições ativas é o número total de partições ativas dentro do buffer de entrega. Por exemplo, se a consulta de particionamento dinâmico monta 3 partições por segundo e você tiver uma configuração de sugestão de buffer que aciona a entrega a cada 60 segundos, então, em média, você teria 180 partições ativas. Se o Firehose não puder entregar os dados em uma partição para um destino, essa partição será considerada ativa no buffer de entrega até que possa ser entregue.

Uma nova partição é criada quando um prefixo do S3 é avaliado como um novo valor com base nos campos de dados do registro e nas expressões do prefixo do S3. Um novo buffer é criado para cada partição ativa. Cada registro subsequente com o mesmo prefixo S3 avaliado é entregue a esse buffer.

Quando o buffer atinge o limite de tamanho do buffer ou o intervalo de tempo do buffer, o Firehose cria um objeto com os dados do buffer e o entrega ao prefixo especificado do Amazon S3. Depois que o objeto é entregue, o buffer dessa partição e da própria partição são excluídos e removidos da contagem de partições ativas.

O Firehose entrega cada dado do buffer como um único objeto quando o tamanho ou o intervalo do buffer são atendidos para cada partição separadamente. Quando o número de partições ativas atinge o limite de 500 por stream do Firehose, o restante dos registros no stream do Firehose é entregue ao prefixo do bucket de erro do S3 especificado (). activePartitionExceeded Você pode usar o formulário Amazon Data Firehose Limits para solicitar um aumento dessa cota para até 5.000 partições ativas por determinado stream do Firehose. Se precisar de mais partições, você pode criar mais streams do Firehose e distribuir as partições ativas entre elas.