Amazon Data Firehose を使用したテーブルへのデータのストリーミング - Amazon Simple Storage Service

Amazon Data Firehose を使用したテーブルへのデータのストリーミング

Amazon Data Firehose は、Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk、Apache Iceberg テーブル、カスタム HTTP エンドポイント、またはサポートされているサードパーティーのサービスプロバイダーが所有する HTTP エンドポイントなどのターゲットにリアルタイムのストリーミングデータを配信するためのフルマネージドサービスです。Amazon Data Firehose を使用すると、アプリケーションを記述したり、リソースを管理したりする必要はありません。Firehose にデータを送信するデータプロデューサーを作成すると、それにより、指定した送信先にデータが自動配信されます。データを配信前に変換するように、Firehose を設定することもできます。Amazon Data Firehose の詳細については、「Amazon Data Firehose とは」を参照してください。

S3 テーブルバケット内のテーブルへの Firehose ストリーミングを設定するには、次の手順に従います。

  1. テーブルバケットを AWS 分析サービスと統合します

  2. S3 テーブルにデータを配信するように Firehose を設定します。そのためには、Firehose がテーブルにアクセスできるように AWS Identity and Access Management (IAM) サービスロールを作成します

  3. Firehose サービスロールにテーブルまたはテーブルの名前空間に対する明示的なアクセス許可を付与します。詳細については、「テーブルリソースに対する Lake Formation 許可の付与」を参照してください。

  4. テーブルにデータをルーティングする Firehose ストリームを作成します。

Firehose が S3 テーブルを送信先として使用するロールの作成

Firehose には、AWS Glue テーブルにアクセスし、S3 テーブルにデータを書き込むための特定のアクセス許可を持つ IAM サービスロールが必要です。Firehose ストリームを作成する際に、この IAM ロールが必要です。

  1. IAM コンソール (https://console.aws.amazon.com/iam/) を開きます。

  2. 左のナビゲーションペインの [ポリシー] を選択します。

  3. [ポリシーを作成] を選択し、ポリシーエディタで [JSON] を選択します。

  4. データカタログ内のすべてのデータベースとテーブルにアクセス許可を付与する次のインラインポリシーを追加します。必要に応じて、特定のテーブルとデータベースにのみ許可を付与できます。このポリシーを使用するには、user input placeholders をユーザー自身の情報に置き換えます。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "S3TableAccessViaGlueFederation", "Effect": "Allow", "Action": [ "glue:GetTable", "glue:GetDatabase", "glue:UpdateTable" ], "Resource": [ "arn:aws:glue:region:account-id:catalog/s3tablescatalog/*", "arn:aws:glue:region:account-id:catalog/s3tablescatalog", "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/*", "arn:aws:glue:region:account-id:table/*/*" ] }, { "Sid": "S3DeliveryErrorBucketPermission", "Effect": "Allow", "Action": [ "s3:AbortMultipartUpload", "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::error delivery bucket", "arn:aws:s3:::error delivery bucket/*" ] }, { "Sid": "RequiredWhenUsingKinesisDataStreamsAsSource", "Effect": "Allow", "Action": [ "kinesis:DescribeStream", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:ListShards" ], "Resource": "arn:aws:kinesis:region:account-id:stream/stream-name" }, { "Sid": "RequiredWhenDoingMetadataReadsANDDataAndMetadataWriteViaLakeformation", "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": "*" }, { "Sid": "RequiredWhenUsingKMSEncryptionForS3ErrorBucketDelivery", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKey" ], "Resource": [ "arn:aws:kms:region:account-id:key/KMS-key-id" ], "Condition": { "StringEquals": { "kms:ViaService": "s3.region.amazonaws.com" }, "StringLike": { "kms:EncryptionContext:aws:s3:arn": "arn:aws:s3:::error delivery bucket/prefix*" } } }, { "Sid": "LoggingInCloudWatch", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:log-group-name:log-stream:log-stream-name" ] }, { "Sid": "RequiredWhenAttachingLambdaToFirehose", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": [ "arn:aws:lambda:region:account-id:function:function-name:function-version" ] } ] }

    このポリシーには、Kinesis Data Streams へのアクセス、Lambda 関数の呼び出し、AWS KMS キーへのアクセスを許可するステートメントが含まれています。これらのリソースを使用しない場合は、それぞれのステートメントを削除できます。

    エラーログ記録が有効になっている場合、Firehose はデータ配信エラーを CloudWatch ロググループとストリームにも送信します。このためには、ロググループとログストリーム名を設定する必要があります。ロググループとログストリーム名については、「Monitor Amazon Data Firehose Using CloudWatch Logs」を参照してください。

  5. ポリシーを作成したら、[信頼されたエンティティタイプ] として [AWS サービス] を使用して IAM ロールを作成します。

  6. [サービスまたはユースケース] で、[Kinesis] を選択します。[ユースケース]で、[Kinesis Firehose] を選択します。

  7. [次へ] を選択して、以前に作成したポリシーを選択します。

  8. ロールに名前を付けます。ロールの詳細を確認し、[ロールの作成] を選択します。ロールには、以下の信頼ポリシーがあります。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "sts:AssumeRole" ], "Principal": { "Service": [ "firehose.amazonaws.com" ] } } ] }

S3 テーブルへの Firehose ストリームの設定

次の手順は、コンソールを使用して S3 テーブルにデータを配信するように Firehose ストリームを作成する方法を示しています。S3 テーブルへの Firehose ストリームを設定するには、次の前提条件が必要です。

ストリームを設定するときに Firehose にルーティング情報を提供するには、データベース名とその名前空間内のテーブル名として名前空間を使用します。これらの値は、Firehose ストリーム設定の一意のキーセクションで使用して、データを 1 つのテーブルにルーティングできます。この値を使用して、JSON クエリ式を使用してテーブルにルーティングすることもできます。詳細については、「Route incoming records to a single Iceberg table」を参照してください。

S3 テーブルへの Firehose ストリームを設定するには (コンソール)
  1. Firehose コンソール (https://console.aws.amazon.com/firehose/) を開きます。

  2. [Firehose ストリームを作成] を選択します。

  3. [ソース] で、以下のいずれかのオプションを選択します。

    • Amazon Kinesis Data Streams

    • Amazon MSK

    • ダイレクト PUT

  4. [送信先] で、[Apache Iceberg テーブル] を選択します。

  5. Firehose ストリーム名を入力します。

  6. [ソース設定] を構成します。

  7. [送信先の設定] では、自分のアカウントのテーブルにストリーミングする場合は [現在のアカウント] を選択し、別のアカウントのテーブルにストリーミングする場合は [クロスアカウント] を選択します。

    • [現在のアカウント] のテーブルには、[カタログ] ドロップダウンから [S3 Tables カタログ] を選択します。

    • [クロスアカウント] のテーブルには、別のアカウントのストリーミング先のカタログの [カタログ ARN] を入力します。

  8. [一意のキー設定]、JSONQuery 式、または Lambda 関数を使用して、データベース名とテーブル名を設定します。詳細については、「Firehose デベロッパーガイド」の「着信レコードを単一の Iceberg テーブルにルーティングする」および「着信レコードを異なる Iceberg テーブルにルーティングする」を参照してください。

  9. [バックアップ設定] で、[S3 バックアップバケット] を指定します。

  10. [詳細設定][既存の IAM ロール] で、Firehose 用に作成した IAM ロールを選択します。

  11. [Firehose ストリームを作成] を選択します。

ストリームに対して設定できるその他の設定の詳細については、「Amazon Firehose デベロッパーガイド」の「Firehose ストリームを設定する」を参照してください。