Amazon Managed Streaming for Apache Kafka からのストリーミング取り込みを開始する - Amazon Redshift

Amazon Managed Streaming for Apache Kafka からのストリーミング取り込みを開始する

Amazon Redshift ストリーミング取り込みの目的は、ストリーミングサービスから Amazon Redshift または Amazon Redshift Serverless にストリームデータを直接取り込むプロセスを簡略化することです。これは Amazon MSK と Amazon MSK Severless、および Kinesis で動作します。Amazon Redshift ストリーミング取り込みにより、ストリームデータを Redshift に取り込む前に、Kinesis Data Streams ストリームまたは Amazon MSK トピックを Amazon S3 にステージングする必要がなくなります。

技術的なレベルでは、Amazon Kinesis Data Streams と Amazon Managed Streaming for Apache Kafka の両方からのストリーミング取り込みは、Amazon Redshift のマテリアライズドビューへのストリームまたはトピックデータの低レイテンシーで高速な取り込みを提供します。セットアップ後、マテリアライズドビューの更新を使用すると、大量のデータを取り込むことができます。

以下の手順を実行して、Amazon MSK 用の Amazon Redshift ストリーミング取り込みをセットアップします。

  1. ストリーミングデータソースにマッピングする外部スキーマを作成します。

  2. 外部スキーマを参照するマテリアライズドビューを作成します。

Amazon Redshift ストリーミング取り込みを設定する前に、Amazon MSK ソースが利用可能になっている必要があります。ソースがない場合は、「Amazon MSK の使用を開始する」の指示に従ってください。

注記

ストリーミング取り込みと Amazon Redshift Serverless - このトピックの設定手順は Amazon Redshift クラスターおよび Amazon Redshift Serverless の両方に適用されます。詳細については、「ストリーミング取り込みの動作とデータタイプ」を参照してください。

IAM アクセス許可の設定と Kafka からのストリーミング取り込みの実行

Amazon MSK クラスターが利用可能な場合の最初のステップは、Kafka トピックをデータソースとして Redshift で CREATE EXTERNAL SCHEMA を使用してスキーマを定義し、Kafka トピックを参照することです。その後、トピック内のデータにアクセスするために、マテリアライズドビュー内で STREAM を定義します。トピックからのレコードは半構造的な SUPER 形式で保存できます。あるいは、Amazon Redshift データ型に変換されたデータを出力するスキーマを定義することができます。マテリアライズドビューをクエリすると、返されるレコードにはその時点のトピックが反映されます。

  1. Amazon Redshift クラスターまたは Amazon Redshift Serverless がロールを引き受けることを許可する信頼ポリシーを持つ IAM ロールを作成します。IAM ロール向けに信頼ポリシーを設定する方法については、「ユーザーに代わって Amazon Redshift が他の AWS サービスにアクセスすることを許可する」を参照してください。作成されたロールには次の IAM ポリシーが設定されており、これにより、MSK クラスターとの通信に関するアクセス許可が提供されます。Amazon MSK を使用する場合、必要なポリシーは、クラスターで使用されている認証方法によって異なります。Amazon MSK で利用できる認証方法については、「Authentication and Authorization for Apache Kafka APIs」(Apache Kafka API の認証と承認) を参照してください。

    認証されていないアクセスを使用する Amazon MSK の IAM ポリシー:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }

    IAM 認証を使用する場合の Amazon MSK の IAM ポリシー:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] }, { "Sid": "MSKPolicy", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }
  2. VPC を確認して、Amazon Redshift クラスターまたは Amazon Redshift Serverless に Amazon MSK クラスターに到達するためのルートがあることを確認します。Amazon MSK クラスターのインバウンドセキュリティグループルールでは、Amazon Redshift クラスターまたは Amazon Redshift Serverless ワークグループのセキュリティグループを許可する必要があります。Amazon MSK を使用する場合、指定するポートはクラスターに使用される認証方法によって異なります。詳細については、「ポート情報」と「AWS 内かつ VPC 外からのアクセス」を参照してください。

    ストリーミング取り込みでは、mTLS によるクライアント認証はサポートされないことに注意してください。詳細については、「制限」を参照してください。

    次の表は、Amazon MSK からのストリーミング取り込みに設定する、補足の設定オプションを示しています。

    Amazon Redshift の設定 Amazon MSK 設定 Redshift と Amazon MSK の間で開くポート
    AUTHENTICATION NONE TLS トランスポート無効 9092
    AUTHENTICATION NONE TLS トランスポート有効 9094
    AUTHENTICATION IAM IAM 9098/9198

    Amazon Redshift 認証は、CREATE EXTERNAL SCHEMA ステートメントで設定されます。

    Amazon MSK クラスターで相互 Transport Layer Security (mTLS) 認証が有効になっている場合は、AUTHENTICATION NONE を使用するよう Amazon Redshift を設定すると、認証されていないアクセスにはポート 9094 を使用するように指示されます。ただし、ポートは mTLS 認証で使用されているため、これは失敗します。このため、mTLS を使用する場合は AUTHENTICATION IAM に切り替えることをお勧めします。

  3. Amazon Redshift クラスターまたは Amazon Redshift Serverless ワークグループで拡張 VPC ルーティングを有効にします。詳細については、「拡張された VPC ルーティングの有効化」を参照してください。

    注記

    Amazon MSK ブートストラップブローカー URL を取得するために、Amazon Redshift は、アタッチされた IAM ロールによって提供されるアクセス許可を使用して GetBootstrapBrokers API コールを実行します。拡張された VPC ルーティングが有効な場合にこのリクエストを成功させるには、Amazon Redshift でプロビジョニングされたクラスターまたは Amazon Redshift Serverless ワークグループのサブネットに NAT ゲートウェイまたはインターネットゲートウェイが存在している必要があります。前述のサブネットのネットワーク ACL とセキュリティグループのアウトバウンドルールでも、Amazon MSK API サービスエンドポイントへのアクセスを許可する必要があります。詳細は、「Apache Kafka 用 Amazon Managed Streaming エンドポイントとクォータ」を参照してください。

  4. Amazon Redshift で、Amazon MSK クラスターにマッピングする外部スキーマを作成します。

    CREATE EXTERNAL SCHEMA MySchema FROM MSK IAM_ROLE { default | 'iam-role-arn' } AUTHENTICATION { none | iam } CLUSTER_ARN 'msk-cluster-arn';

    FROM の部分では、Amazon MSK は、スキーマが Managed Kafka Services のデータをマッピングしていることを示しています。

    Amazon MSK のストリーミング取り込みでは、外部スキーマを作成するときに次の認証タイプが提供されます。

    • none — 認証ステップがないことを指定します。

    • iam — IAM 認証を指定します。これを選択するときは、IAM ロールに IAM 認証のアクセス許可があることを確認します。

    TLS 認証やユーザー名とパスワードなど、その他の Amazon MSK 認証方法は、ストリーミング取り込みではサポートされていません。

    CLUSTER_ARN はストリーミング元の Amazon MSK クラスターを指定します。

  5. トピックからのデータを利用するためのマテリアライズドビューを作成します。エラーレコードをスキップしない場合は、このサンプルのような SQL コマンドを使用します。

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT * FROM MySchema."mytopic";

    次の例では、JSON ソースデータを使用するマテリアライズドビューを定義しています。次に示すビューでは、データが有効な JSON で UTF8 形式であることを検証しています。Kafka トピックの名前では大文字と小文字が区別され、その両方を使用することができます。名前が大文字のトピックからインジェストするには、データベースレベルで設定 enable_case_sensitive_identifiertrue に指定できます。詳細については、「名前と識別子」ならびに「enable_case_sensitive_identifier」を参照してください。

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT kafka_partition, kafka_offset, kafka_timestamp_type, kafka_timestamp, kafka_key, JSON_PARSE(kafka_value) as kafka_data, kafka_headers, refresh_time FROM MySchema."mytopic" WHERE CAN_JSON_PARSE(kafka_value);

    自動更新をオンにするには、AUTO REFRESH YES を使用してください。デフォルトの動作は手動更新です。

    メタデータ列には次の列が含まれます。

    メタデータ列 データ型 説明
    kafka_partition bigint Kafka トピックのレコードのパーティション ID
    kafka_offset bigint 指定されたパーティションの Kafka トピック内のレコードのオフセット
    kafka_timestamp_type char(1)

    Kafka レコードで使用されるタイムスタンプのタイプ:

    • C — クライアント側でのレコード作成時間 (CREATE_TIME)

    • L — Kafka サーバー側のレコード追加時間 (LOG_APPEND_TIME)

    • U — レコードの作成時刻が利用不可 (NO_TIMESTAMP_TYPE)

    kafka_timestamp タイムゾーンなしのタイムスタンプ レコードの timestamp 値
    kafka_key varbyte Kafka レコードのキー
    kafka_value varbyte Kafka から受け取ったレコード
    kafka_headers super Kafka から受け取ったレコードのヘッダー
    refresh_time タイムゾーンなしのタイムスタンプ 更新の開始時間

    マテリアライズドビュー定義内のビジネスロジックによっては、ビジネスロジックのエラーに伴ってストリーミング取り込みがブロックされることに注意してください。場合によっては、マテリアライズドビューを削除して再作成しなければならないことがあります。これを回避するには、ビジネスロジックをシンプルにし、インジェスト後のデータに追加のロジックを実行することをお勧めします。

  6. ビューを更新します。これにより、Amazon Redshift を呼び出してトピックから読み取り、データがマテリアライズドビューにロードされます。

    REFRESH MATERIALIZED VIEW MyView;
  7. マテリアライズドビュー内でデータをクエリします。

    select * from MyView;

    マテリアライズドビューは、REFRESH の実行時にトピックから直接更新されます。Kafka トピックのデータソースにマッピングするマテリアライズドビューを作成します。マテリアライズドビューの定義を行う際には、データをフィルタリングしたり集計したりできます。ストリーミング取り込みのマテリアライズドビュー (基盤のマテリアライズドビュー) は 1 つの Kafka トピックのみを参照します。ただし、追加のマテリアライズドビューを作成して基盤のマテリアライズドビューと結合したり、は別のマテリアライズドビューやテーブルと結合したりできます。

ストリーミング取り込みの制限の詳細については、「ストリーミング取り込みの動作とデータタイプ」を参照してください。