Partizionamento dinamico in Amazon Data Firehose - Amazon Data Firehose

La distribuzione di stream Amazon Data Firehose ad Apache Iceberg Tables in Amazon S3 è in anteprima ed è soggetta a modifiche.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Partizionamento dinamico in Amazon Data Firehose

Il partizionamento dinamico consente di partizionare continuamente i dati in streaming in Firehose utilizzando chiavi all'interno dei dati (ad esempio, customer_id otransaction_id) e quindi fornire i dati raggruppati da queste chiavi nei prefissi Amazon Simple Storage Service (Amazon S3) corrispondenti. Ciò semplifica l'esecuzione di analisi ad alte prestazioni ed economiche sui dati in streaming in Amazon S3 utilizzando vari servizi come Amazon Athena, Amazon, Amazon EMR Redshift Spectrum e Amazon. QuickSight Inoltre, AWS Glue può eseguire processi di estrazione, trasformazione e caricamento (ETL) più sofisticati dopo che i dati di streaming partizionati dinamicamente sono stati consegnati ad Amazon S3, in casi d'uso in cui è richiesta un'ulteriore elaborazione.

Il partizionamento dei dati riduce al minimo la quantità di dati scansionati, ottimizza le prestazioni e riduce i costi delle query di analisi su Amazon S3. Inoltre, aumenta l'accesso granulare ai dati. I flussi Firehose vengono tradizionalmente utilizzati per acquisire e caricare dati in Amazon S3. Per partizionare un set di dati in streaming per l'analisi basata su Amazon S3, è necessario eseguire il partizionamento di applicazioni tra i bucket Amazon S3 prima di rendere i dati disponibili per l'analisi, operazione che potrebbe diventare complicata o costosa.

Con il partizionamento dinamico, Firehose raggruppa continuamente i dati in transito utilizzando chiavi dati definite dinamicamente o staticamente e fornisce i dati ai singoli prefissi Amazon S3 per chiave. time-to-insight Ciò si riduce di minuti o ore. Inoltre, riduce i costi e semplifica le architetture.

Lavora con le chiavi di partizionamento

Con il partizionamento dinamico, crei set di dati mirati dai dati S3 in streaming partizionandoli in base alle chiavi di partizionamento. Le chiavi di partizionamento consentono di filtrare i dati in streaming in base a valori specifici. Ad esempio, se è necessario filtrare i dati in base all'ID cliente e al paese, è possibile specificare il campo dati customer_id come una chiave di partizionamento e il campo dati country come un'altra chiave di partizionamento. Quindi, specificare le espressioni (utilizzando i formati supportati) per definire i prefissi dei bucket S3 a cui devono essere distribuiti i record di dati partizionati in modo dinamico.

Di seguito sono riportati i metodi supportati per la creazione di chiavi di partizionamento:

  • Analisi in linea: questo metodo utilizza il meccanismo di supporto integrato di Firehose, un parser jq, per estrarre le chiavi per il partizionamento dai record di dati in formato. JSON Attualmente jq 1.6 supportiamo solo la versione.

  • AWS Funzione Lambda: questo metodo utilizza una funzione AWS Lambda specificata per estrarre e restituire i campi dati necessari per il partizionamento.

Importante

Quando abiliti il partizionamento dinamico, devi configurare almeno uno di questi metodi per partizionare i dati. Puoi configurare uno di questi metodi per specificare le chiavi di partizionamento o entrambi contemporaneamente.

Creazione di chiavi di partizionamento con analisi in linea

Per configurare l'analisi in linea come metodo di partizionamento dinamico per i dati di streaming, è necessario scegliere i parametri del record di dati da utilizzare come chiavi di partizionamento e fornire un valore per ogni chiave di partizionamento specificata.

Il seguente record di dati di esempio mostra come definire le relative chiavi di partizionamento con l'analisi in linea. Nota che i dati devono essere codificati nel formato Base64. Puoi anche fare riferimento all'esempio. CLI

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

Ad esempio, puoi scegliere di partizionare i dati in base al parametro customer_id o al parametro event_timestamp. Ciò significa che desideri che il valore del parametro customer_id o del parametro event_timestamp in ogni record venga utilizzato per determinare il prefisso S3 a cui deve essere distribuito il record. Puoi anche scegliere un parametro nidificato, ad esempio device con un'espressione .type.device. La logica di partizionamento dinamico può dipendere da più parametri.

Dopo aver selezionato i parametri dei dati per le chiavi di partizionamento, mappa ogni parametro a un'espressione jq valida. La tabella seguente mostra una tale mappatura dei parametri alle espressioni jq:

Parametro espressione jq
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

In fase di esecuzione, Firehose utilizza la colonna destra in alto per valutare i parametri in base ai dati di ogni record.

Crea chiavi di partizionamento con una funzione Lambda AWS

Per i record di dati compressi o crittografati o i dati in qualsiasi formato di file diverso da quelloJSON, puoi utilizzare la funzione AWS Lambda integrata con il tuo codice personalizzato per decomprimere, decrittografare o trasformare i record per estrarre e restituire i campi dati necessari per il partizionamento. Si tratta di un'espansione della funzione di trasformazione Lambda esistente oggi disponibile con Firehose. Puoi quindi trasformare, analizzare e restituire i campi di dati da utilizzare quindi per il partizionamento dinamico usando la stessa funzione Lambda.

Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Python che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

Di seguito è riportato un esempio di funzione Lambda di elaborazione del flusso Firehose in Go che riproduce ogni record letto dall'input all'output ed estrae le chiavi di partizionamento dai record.

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

Usa il prefisso del bucket Amazon S3

Quando crei uno stream Firehose che utilizza Amazon S3 come destinazione, devi specificare un bucket Amazon S3 a cui Firehose deve fornire i tuoi dati. I prefissi del bucket Amazon S3 vengono utilizzati per organizzare i dati archiviati nei bucket Amazon S3. Un prefisso del bucket Amazon S3 è simile a una directory che consente di raggruppare oggetti simili.

Con il partizionamento dinamico, i dati partizionati vengono distribuiti nei prefissi Amazon S3 specificati. Se non abiliti il partizionamento dinamico, è facoltativo specificare un prefisso del bucket S3 per il flusso Firehose. Tuttavia, se si sceglie di abilitare il partizionamento dinamico, è necessario specificare i prefissi dei bucket S3 a cui Firehose fornisce i dati partizionati.

In ogni flusso Firehose in cui è abilitato il partizionamento dinamico, il valore del prefisso del bucket S3 è costituito da espressioni basate sulle chiavi di partizionamento specificate per quel flusso Firehose. Utilizzando nuovamente l'esempio di record di dati precedente, puoi creare il seguente valore di prefisso S3 che consiste in espressioni basate sulle chiavi di partizionamento definite sopra:

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Firehose valuta l'espressione precedente in fase di esecuzione. Raggruppa i record che corrispondono alla stessa espressione di prefisso S3 valutata in un unico set di dati. Firehose invia quindi ogni set di dati al prefisso S3 valutato. La frequenza di consegna del set di dati a S3 è determinata dall'impostazione del buffer di flusso Firehose. Di conseguenza, il record in questo esempio viene distribuito alla seguente chiave oggetto S3:

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

Per il partizionamento dinamico, è necessario utilizzare il seguente formato di espressione nel prefisso del bucket S3: !{namespace:value}, dove lo spazio dei nomi può essere partitionKeyFromQuery, partitionKeyFromLambda o entrambi. Se si utilizza l'analisi in linea per creare le chiavi di partizionamento per i dati di origine, è necessario specificare un valore del prefisso del bucket S3 costituito da espressioni specificate nel seguente formato: "partitionKeyFromQuery:keyID". Se si utilizza una funzione AWS Lambda per creare chiavi di partizionamento per i dati di origine, è necessario specificare un valore di prefisso del bucket S3 costituito da espressioni specificate nel seguente formato: "partitionKeyFromLambda:keyID".

Nota

È inoltre possibile specificare il valore del prefisso del bucket S3 utilizzando il formato in stile hive, ad esempio customer_id=! partitionKeyFrom{query:customer_id}.

Per ulteriori informazioni, consulta «Scegli Amazon S3 per la tua destinazione» in Creazione di uno stream Amazon Firehose e prefissi personalizzati per oggetti Amazon S3.

Applica il partizionamento dinamico ai dati aggregati

È possibile applicare il partizionamento dinamico ai dati aggregati (ad esempio, più eventi, registri o record aggregati in un'unica PutRecord PutRecordBatch API chiamata), ma questi dati devono prima essere disaggregati. È possibile disaggregare i dati abilitando la deaggregazione di più record, il processo di analisi dei record nel flusso Firehose e la loro separazione.

La disaggregazione di più record può essere di JSON tipo diverso, il che significa che la separazione dei record si basa su oggetti consecutivi. JSON La disaggregazione può anche essere del tipoDelimited, vale a dire che la separazione dei record viene eseguita sulla base di un delimitatore personalizzato specificato. Questo delimitatore personalizzato deve essere una stringa con codifica in base 64. Ad esempio, se si desidera utilizzare la stringa seguente come delimitatore personalizzato, è necessario specificarla nel formato codificato base-64####, che la traduce in. IyMjIw==

Nota

Quando disaggregate JSON i record, assicuratevi che l'input sia ancora presentato nel formato supportato. JSON JSONgli oggetti devono trovarsi su una sola riga senza delimitatori o solo delimitatori di nuova riga (). JSONL Una matrice di JSON oggetti non è un input valido.

Questi sono esempi di input corretto: {"a":1}{"a":2} and {"a":1}\n{"a":2}

Questo è un esempio di immissione errata: [{"a":1}, {"a":2}]

Con i dati aggregati, quando si abilita il partizionamento dinamico, Firehose analizza i record e cerca JSON oggetti validi o record delimitati all'interno di ogni API chiamata in base al tipo di deaggregazione multi-record specificato.

Importante

Se i dati sono aggregati, il partizionamento dinamico può essere applicato solo se i dati vengono prima disaggregati.

Importante

Quando si utilizza la funzionalità di trasformazione dei dati in Firehose, la deaggregazione verrà applicata prima della trasformazione dei dati. I dati che entrano in Firehose verranno elaborati nel seguente ordine: Deaggregazione → Trasformazione dei dati tramite Lambda → Chiavi di partizionamento.

Aggiungi un nuovo delimitatore di riga quando invii dati a S3

Puoi abilitare New Line Delimiter per aggiungere un nuovo delimitatore di riga tra i record negli oggetti che vengono consegnati ad Amazon S3. Ciò può essere utile per analizzare gli oggetti in Amazon S3. Ciò è particolarmente utile anche quando il partizionamento dinamico viene applicato a dati aggregati, poiché la deaggregazione multi-record (che deve essere applicata ai dati aggregati prima di poter essere partizionati dinamicamente) rimuove nuove righe dai record come parte del processo di analisi.

Abilita il partizionamento dinamico

Puoi configurare il partizionamento dinamico per i tuoi flussi Firehose tramite la Console di gestione Amazon Data Firehose o il. CLI APIs

Importante

È possibile abilitare il partizionamento dinamico solo quando si crea un nuovo flusso Firehose. Non è possibile abilitare il partizionamento dinamico per un flusso Firehose esistente per il quale il partizionamento dinamico non è già abilitato.

Per i passaggi dettagliati su come abilitare e configurare il partizionamento dinamico tramite la console di gestione Firehose durante la creazione di un nuovo flusso Firehose, consulta Creazione di un flusso Amazon Firehose. Quando devi specificare la destinazione per il tuo stream Firehose, assicurati di seguire i passaggi nella sezione Scegli Amazon S3 per la tua destinazione, poiché attualmente il partizionamento dinamico è supportato solo per i flussi Firehose che utilizzano Amazon S3 come destinazione.

Una volta abilitato il partizionamento dinamico su un flusso Firehose attivo, è possibile aggiornare la configurazione aggiungendo nuove chiavi di partizionamento o rimuovendo o aggiornando quelle esistenti e le espressioni del prefisso S3. Una volta aggiornato, Firehose inizia a utilizzare le nuove chiavi e le nuove espressioni di prefisso S3.

Importante

Una volta abilitato il partizionamento dinamico su un flusso Firehose, non può essere disabilitato su questo flusso Firehose.

Risolvi gli errori di partizionamento dinamico

Se Amazon Data Firehose non è in grado di analizzare i record di dati nel tuo flusso Firehose o non riesce a estrarre le chiavi di partizionamento specificate o a valutare le espressioni incluse nel valore del prefisso S3, questi record di dati vengono inviati al prefisso del bucket di errore S3 che devi specificare quando crei il flusso Firehose in cui abiliti il partizionamento dinamico. Il prefisso del bucket di errore S3 contiene tutti i record che Firehose non è in grado di inviare alla destinazione S3 specificata. Questi record sono organizzati in base al tipo di errore. Oltre al record, l'oggetto distribuito include anche informazioni sull'errore per facilitarne la comprensione e la risoluzione.

È necessario specificare un prefisso del bucket di errore S3 per un flusso Firehose se si desidera abilitare il partizionamento dinamico per questo flusso Firehose. Se non si desidera abilitare il partizionamento dinamico per un flusso Firehose, è facoltativo specificare un prefisso del bucket di errore S3.

Buffering dei dati e partizionamento dinamico

Amazon Data Firehose memorizza nel buffer i dati di streaming in entrata fino a una certa dimensione e per un determinato periodo di tempo prima di consegnarli alle destinazioni specificate. È possibile configurare la dimensione e l'intervallo del buffer durante la creazione di nuovi flussi Firehose o aggiornare la dimensione e l'intervallo del buffer sui flussi Firehose esistenti. La dimensione del buffer viene misurata in e l'intervallo del buffer viene misurato in secondi. MBs

Nota

La funzionalità Zero Buffering non è disponibile per il partizionamento dinamico.

Quando il partizionamento dinamico è abilitato, Firehose memorizza internamente i record che appartengono a una determinata partizione in base al suggerimento di buffering configurato (dimensione e ora) prima di consegnare questi record al bucket Amazon S3. Per fornire oggetti di dimensioni massime, Firehose utilizza internamente un buffering multistadio. Pertanto, il end-to-end ritardo di un batch di record potrebbe essere 1,5 volte il tempo di suggerimento per il buffering configurato. Ciò influisce sulla freschezza dei dati di un flusso Firehose.

Il conteggio delle partizioni attive corrisponde al numero totale di partizioni attive all'interno del buffer di distribuzione. Ad esempio, se la query di partizionamento dinamico costruisce 3 partizioni al secondo e disponi di una configurazione di suggerimento per il buffering che attiva la distribuzione ogni 60 secondi, in media si avranno 180 partizioni attive. Se Firehose non è in grado di consegnare i dati in una partizione a una destinazione, questa partizione viene contata come attiva nel buffer di consegna fino a quando non può essere consegnata.

Una nuova partizione viene creata quando un prefisso S3 viene valutato con un nuovo valore in base ai campi di dati del record e alle espressioni del prefisso S3. Un nuovo buffer viene creato per ogni partizione attiva. Ogni record successivo con lo stesso prefisso S3 valutato viene inviato a quel buffer.

Una volta che il buffer raggiunge il limite di dimensione del buffer o l'intervallo di tempo del buffer, Firehose crea un oggetto con i dati del buffer e lo invia al prefisso Amazon S3 specificato. Dopo la consegna dell'oggetto, il buffer per quella partizione e la partizione stessa vengono eliminati e rimossi dal conteggio delle partizioni attive.

Firehose fornisce ogni dato del buffer come un singolo oggetto una volta soddisfatte le dimensioni o l'intervallo del buffer per ciascuna partizione separatamente. Una volta che il numero di partizioni attive raggiunge il limite di 500 per flusso Firehose, il resto dei record del flusso Firehose viene inviato al prefisso del bucket di errore S3 specificato (). activePartitionExceeded Puoi utilizzare il modulo Amazon Data Firehose Limits per richiedere un aumento di questa quota fino a 5000 partizioni attive per un determinato flusso Firehose. Se sono necessarie più partizioni, è possibile creare più flussi Firehose e distribuire le partizioni attive su di essi.