Kinesis Data Firehose での動的パーティショニング - Amazon Kinesis Data Firehose

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Firehose での動的パーティショニング

動的パーティショニングを使用すると、データ内のキーを使用して (たとえば、customer_id または transaction_id) Kinesis Data Firehose でストリーミングデータを継続的にパーティショニングし、これらのキーでグループ化されたデータを、対応する Amazon Simple Storage Service (Amazon S3) プレフィックスに配信できます。これにより、Amazon Athena、Amazon EMR、Amazon Redshift Spectrum、Amazon Redshift Spectrum、Amazon などのさまざまなサービスを使用して、Amazon S3 のストリーミングデータに対して高性能でコスト効率の高い分析を簡単に実行できるようになります QuickSight。また、AWS Glue は、追加の処理が必要なユースケースで、動的にパーティショニングされたストリーミングデータを Amazon S3 に配信した後、より高度な抽出、変換、ロード (ETL) ジョブを実行できます。

データをパーティショニングすることで、スキャンされるデータ量が最小限に抑えられ、パフォーマンスが最適化され、Amazon S3 での分析クエリのコストが削減されます。また、データへのきめ細かいアクセスも向上します。Kinesis Data Firehose 配信ストリームは、データをキャプチャして Amazon S3 にロードするために従来より使用されています。Amazon S3 ベースの分析用にストリーミングデータセットをパーティショニングするには、データを分析に使用できるようにする前に、Amazon S3 バケット間でパーティショニングアプリケーションを実行する必要がありますが、これは複雑になるか、費用がかかる場合があります。

動的パーティショニングでは、Kinesis Data Firehose は、動的または静的に定義されたデータキーを使用して送信中のデータを継続的にグループ化し、キーごとに個々の Amazon S3 プレフィックスにデータを配信します。これにより、 time-to-insight 数分または数時間短縮されます。また、コストを削減し、アーキテクチャを簡素化します。

パーティショニングキー

動的パーティショニングでは、パーティショニングキーに基づいてデータをパーティショニングすることで、ストリーミング S3 データからターゲットデータセットを作成します。パーティショニングキーを使用すると、特定の値に基づいてストリーミングデータをフィルタリングできます。たとえば、顧客 ID と国に基づいてデータをフィルタリングする必要がある場合は、1 つのパーティショニングキーとして customer_id のデータフィールドを、また別のパーティショニングキーとして country のデータフィールドを指定できます。次に、(サポートされている形式を使用して) 式を指定し、動的にパーティショニングされたデータレコードの配信先となる S3 バケットプレフィックスを定義します。

パーティショニングキーの作成でサポートされているメソッドは次のとおりです。

  • インライン解析 - このメソッドは、JSON 形式のデータレコードからパーティショニングするためのキーの抽出で、Amazon Kinesis Data Firehose 組み込みサポートメカニズムである jq パーサーを使用します。

  • AWS Lambda 関数 - このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します 。

重要

動的パーティショニングを有効にする場合、データをパーティショニングするには、これらのメソッドの少なくとも 1 つを設定する必要があります。これらのメソッドのいずれかを設定して、パーティショニングキーを指定することも、両方を同時に指定することもできます。

インライン解析によるパーティショニングキーの作成

ストリーミングデータの動的パーティショニングメソッドとしてインライン解析を設定するには、パーティショニングキーとして使用するデータレコードパラメータを選択し、それぞれの指定したパーティショニングキーの値を提供する必要があります。

次のサンプルデータレコードを見て、インライン解析でパーティショニングキーを定義する方法を確認しましょう。

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

たとえば、customer_id パラメータまたは event_timestamp パラメータに基づいてデータをパーティショニングすることを選択できます。これは、レコードが配信される S3 プレフィックスの決定に使用される各レコードの customer_id パラメータまたは event_timestamp パラメータの値が必要であることを意味します。また、式 .type.device を用いた device のように、ネストされたパラメータを選択することもできます。動的パーティショニングロジックは、複数のパラメータに依存する可能性があります。

パーティショニングキーのデータパラメータを選択した後、各パラメータを有効な jq 式にマップします。次のテーブルに、jq 式へのパラメータのマッピングを示します。

パラメータ ip 式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

実行時に、Kinesis Data Firehose は上の右の列を使用して、各レコードのデータに基づいてパラメータを評価します。

AWS Lambda 関数を用いてパーティショニングキーを作成する

圧縮または暗号化されたデータレコード、または JSON 以外のファイル形式のデータについては、パーティショニングに必要なデータフィールドを抽出して返すために、レコードの解凍、復号化、または変換を行う独自のカスタムコードを持つ統合された AWS Lambda 関数を使用することができます。これは、Kinesis Data Firehose で現在利用できる既存の変換 Lambda 関数の拡張です。同じ Lambda 関数を使用して、動的パーティショニングに使用できるデータフィールドを変換、解析、および返すことができます。

以下は、入力から出力までのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する Python の Lambda 関数を処理する Amazon Kinesis Firehose 配信ストリームの例です。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下は、入力から出力までのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する Go の Lambda 関数を処理する Amazon Kinesis Firehose 配信ストリームの例です。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type KinesisFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.KinesisFirehoseEvent) (events.KinesisFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.KinesisFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.KinesisFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.KinesisFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.KinesisFirehoseResponseRecordMetadata var recordData KinesisFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

動的パーティショニングの Amazon S3 バケットプレフィックス

Amazon S3 を送信先として使用する配信ストリームを作成する場合は、Kinesis Data Firehose がデータを配信する Amazon S3 バケットを指定する必要があります。Amazon S3 バケットプレフィックスを使用して、S3 バケットに保存するデータを整理できます。Amazon S3 バケットプレフィックスは、類似オブジェクトをグループ化できるディレクトリと同様のものです。

動的パーティショニングでは、パーティショニングされたデータは、指定された Amazon S3 プレフィックスに配信されます。動的パーティショニングを有効にしない場合、配信ストリームの S3 バケットプレフィックスの指定はオプションです。ただし、動的パーティショニングを有効にする場合は、Kinesis Data Firehose がパーティショニングされたデータを配信する S3 バケットプレフィックスを指定する必要があります。

動的パーティショニングを有効にするすべての配信ストリームで、S3 バケットプレフィックス値は、その配信ストリームに対して指定されたパーティショニングキーに基づく式で構成されます。上記のデータレコードの例を再度使用して、上記で定義したパーティショニングキーに基づく式で構成される次の S3 プレフィックス値を構築できます。

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Kinesis Data Firehose は、実行時に上記の式を評価します。同じ評価された S3 プレフィックス式に一致するレコードを 1 つのデータセットにグループ化します。Kinesis Data Firehose は、評価された S3 プレフィックスに各データセットを配信します。S3 へのデータセット配信の頻度は、配信ストリームバッファ設定によって決まります。その結果、この例のレコードは次の S3 オブジェクトキーに配信されます。

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

動的パーティショニングでは、S3 バケットプレフィックスで次の式形式を使用する必要があります: !{namespace:value}。ここで、名前空間は partitionKeyFromQuery または partitionKeyFromLambda、またはその両方です。インライン解析を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィクス値を指定する必要があります。"partitionKeyFromQuery:keyID"。 AWSLambda 関数を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィクス値を指定する必要があります"partitionKeyFromLambda:keyID"

注記

また、Hive スタイルの形式 (customer_id=! など) を使用して S3 バケットプレフィクス値を指定することもできます。 {partitionKeyFromquery: Customer_ID}。

詳細については、Amazon Kinesis Data Firehose 配信ストリームの作成および Amazon S3 オブジェクトのカスタムプレフィックスの「配信先に Amazon S3 を選択」を参照してください。

集約データの動的パーティショニング

動的パーティショニングを集約データ (たとえば、複数のイベント、ログ、または単一の PutRecord および PutRecordBatch API コールに集約されたレコードなど) に適用できますが、このデータはまず集約解除する必要があります。マルチレコードの集約解除を有効にすることで、データを集約解除できます。これは、配信ストリーム内のレコードを解析して分離するプロセスです。マルチレコードの集約解除は、JSON タイプのいずれかにすることができます。これは、レコードの分離は有効な JSON に基づいて実行されることを意味します。あるいは、Delimited タイプにすることができます。これは、レコードの分離は指定されたカスタム区切り文字に基づいて実行されることを意味します。このカスタム区切り文字は base-64 でエンコードされた文字列である必要があります。たとえば、カスタム区切り記号 #### として次の文字列を使用する場合、base-64 のエンコード形式で指定する必要があります。これにより、IyMjIw== に変換されます。

集約データでは、動的パーティショニングを有効にすると、Kinesis Data Firehose はレコードを解析し、指定されたマルチレコードの集約解除タイプに基づいて、各 API コール内で有効な JSON オブジェクトまたは区切り付きレコードを検索します。

重要

データが集約されている場合、動的パーティショニングは、データが最初に集約解除された場合にのみ適用できます。

S3 にデータを配信するときに新しい行区切り文字を追加する

動的パーティションニングを有効にすると、Amazon S3 に配信されるオブジェクトのレコード間に改行区切り記号を追加するように配信ストリームを設定できます。これは、Amazon S3 のオブジェクトの解析に役立ちます。これは、動的パーティショニングが集約データに適用される場合にも特に便利です。それは、マルチレコードの集約解除 (動的にパーティション化する前に集約データに適用する必要があります) は、解析プロセスの一環としてレコードから新しい行を削除するからです。

動的パーティショニングを有効にする方法

Kinesis Data Firehose マネジメントコンソール、CLI、または API を使用して、配信ストリームの動的パーティショニングを設定できます。

重要

動的パーティショニングは、新しい配信ストリームを作成する場合にのみ有効にできます。動的パーティショニングがまだ有効になっていない既存の配信ストリームでは、動的パーティショニングを有効にすることはできません。

新しい配信ストリームの作成中に Amazon Kinesis Data Firehose マネジメントコンソールを使用して動的パーティショニングを有効にして設定する方法の詳細な手順については、「Amazon Kinesis Data Firehose 配信ストリームの作成」を参照してください。配信ストリームの送信先を指定するタスクに到達したら、「送信先にAmazon S3 を選択」セクションの手順に従ってください。現在、動的パーティショニングは、Amazon S3 を送信先として使用する配信ストリームでのみサポートされているからです。

アクティブな配信ストリームで動的パーティショニングを有効にすると、新しいパーティショニングキーと S3 プレフィックス式を追加するか、既存のそれらのキーと式を削除または更新することで、設定を更新できます。更新されると、Amazon Kinesis Data Firehose は新しいキーと新しい S3 プレフィックス式の使用を開始します。

重要

配信ストリームで動的パーティショニングを有効にすると、この配信ストリームでは無効にできません。

動的パーティショニングのエラー処理

Kinesis Data Firehose が配信ストリーム内のデータレコードを解析できない場合、または指定したパーティショニングキーを抽出できない場合、または S3 プレフィックス値に含まれる式を評価できない場合、これらのデータレコードは S3 エラーバケットプレフィックスに配信されます。このプレフィックスは、動的パーティショニングが有効にされる配信ストリームを作成するときに指定する必要があります。S3 エラーバケットプレフィックスには、Kinesis Data Firehose が指定された S3 送信先に配信できないすべてのレコードが含まれています。これらのレコードは、エラータイプに基づいて整理されます。レコードとともに、配信されたオブジェクトには、エラーの理解と解決に役立つエラーに関する情報も含まれます。

配信ストリームの動的パーティショニングを有効にするには、この配信ストリームに S3 エラーバケットプレフィックスを指定する必要があります。配信ストリームの動的パーティショニングを有効にしない場合は、S3 エラーバケットプレフィックスの指定はオプションです。

データバッファリングと動的パーティショニング

Amazon Kinesis Data Firehose は特定の期間、受信するストリーミングデータを特定のサイズにバッファしてから、指定の送信先に配信します。新しい配信ストリームの作成時にバッファサイズとバッファ間隔を設定したり、既存の配信ストリームのバッファサイズとバッファ間隔を更新したりできます。バッファサイズは MB 単位で、バッファ間隔は秒単位です。

動的パーティショニングが有効な場合、Kinesis Data Firehose は、Amazon S3 バケットにこれらのレコードを配信する前に、設定されたバッファリングヒント (サイズと時間) に基づいて特定のパーティションに属するレコードを内部的にバッファリングします。最大サイズのオブジェクトを配信するために、Kinesis Data Firehose は内部で多段階バッファリングを使用します。したがって、 end-to-end レコードのバッチの遅延は、設定されているバッファリングヒント時間の 1.5 倍になる可能性があります。これはデリバリーストリームのデータの鮮度に影響します。

アクティブパーティション数は、配信バッファ内のアクティブパーティションの総数です。たとえば、動的パーティショニングクエリが 1 秒あたり 3 つのパーティションを構築し、60 秒ごとに配信をトリガーするバッファのヒント設定がある場合、平均して 180 個のアクティブパーティションが作成されます。Kinesis Data Firehose がパーティション内のデータを送信先に配信できない場合、このパーティションは、配信可能になるまで配信バッファ内でアクティブとしてカウントされます。

レコードデータフィールドと S3 プレフィックス式に基づいて S3 プレフィクスが新しい値に評価されると、新しいパーティションが作成されます。アクティブパーティションごとに新しいバッファが作成されます。同じ評価された S3 プレフィックスを持つ後続のすべてのレコードが、そのバッファに配信されます。バッファがバッファサイズ制限またはバッファ時間間隔を満たすと、Amazon Kinesis Data Firehose はバッファデータを含むオブジェクトを作成し、指定した Amazon S3 プレフィックスに配信します。オブジェクトが配信されると、そのパーティションのバッファとパーティション自体が削除され、アクティブなパーティション数から除外されます。Amazon Kinesis Data Firehose は、各パーティションのバッファサイズまたは間隔が個別に満たされると、各バッファデータを 1 つのオブジェクトとして配信します。アクティブパーティションの数が配信ストリームごとの 500 個の制限に達すると、配信ストリームの残りのレコードは、指定された S3 エラーバケットプレフィックスに配信されます。