ソースとしての Apache Kafka ストリーム - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ソースとしての Apache Kafka ストリーム

Apache Kafka は、データパイプラインやストリーミング分析などのワークロードをサポートする、オープンソースのイベントストリーミングプラットフォームです。Amazon Managed Streaming for Apache Kafka (Amazon MSK) またはセルフマネージド Apache Kafka クラスターを使用できます。用語 AWS では、セルフマネージドクラスターとは、 によってホストされていないすべての Apache Kafka クラスターを指します AWS。これには、自分で管理するクラスターと、、、 Confluent Cloud CloudKarafkaなどのサードパーティープロバイダーによってホストされるクラスターの両方が含まれますRedpanda

クラスターの他の AWS ホスティングオプションの詳細については、 AWS ビッグデータブログの「 で Apache Kafka を実行するためのベストプラクティス AWS」を参照してください。

ソースとしての Apache Kafka は、Amazon Simple Queue Service (Amazon SQS) または Amazon Kinesis の使用と同様に動作します。 EventBridge は、ソースからの新しいメッセージを内部的にポーリングし、ターゲットを同期的に呼び出します。 EventBridge はメッセージをバッチで読み取り、これらをイベントペイロードとして関数に提供します。最大バッチサイズは調整可能です。(デフォルト値は 100 メッセージ)。

Apache Kafka ベースのソースの場合、 はバッチ処理ウィンドウやバッチサイズなどの処理コントロールパラメータ EventBridge をサポートします。

EventBridge は、パイプを呼び出すときにイベントパラメータにメッセージのバッチを送信します。イベントペイロードにはメッセージの配列が含まれています。各配列項目には、Apache Kafka トピックと Apache Kafka パーティション識別子の詳細が、タイムスタンプおよび base64 でエンコードされたメッセージとともに含まれています。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

[ { "eventSource": "SelfManagedKafka", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "eventSourceKey": "mytopic-0", "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ]

Apache Kafka クラスター認証

EventBridge Pipes は、セルフマネージド Apache Kafka クラスターで認証するためのいくつかの方法をサポートしています。これらのサポートされる認証方法のいずれかを使用するように、Apache Kafka クラスターを設定しておいてください。Apache Kafka セキュリティの詳細については、Apache Kafka ドキュメントの「Security」(セキュリティ) セクションを参照してください。

VPC アクセス

VPC 内の Apache Kafka ユーザーのみが Apache Kafka ブローカーにアクセスできるセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ソースで Amazon Virtual Private Cloud (Amazon VPC) を設定する必要があります。

SASL/SCRAM 認証

EventBridge Pipes は、Transport Layer Security (TLS) 暗号化を使用した Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) 認証をサポートしています。 EventBridge Pipes は暗号化された認証情報を送信してクラスターで認証します。SASL/SCRAM 認証の詳細については、「RFC 5802」を参照してください。

EventBridge Pipes は、TLS 暗号化による SASL/PLAIN 認証をサポートしています。SASL/PLAIN 認証では、 EventBridge Pipes は認証情報をクリアテキスト (暗号化されていない) としてサーバーに送信します。

SASL 認証の場合は、サインイン認証情報をシークレットとして AWS Secrets Managerに保存します。

相互 TLS 認証

相互 TLS (mTLS) は、クライアントとサーバー間の双方向認証を提供します。クライアントは、サーバーによるクライアントの検証のためにサーバーに証明書を送信し、サーバーは、クライアントによるサーバーの検証のためにクライアントに証明書を送信します。

セルフマネージド Apache Kafka では、 EventBridge Pipes がクライアントとして機能します。Apache Kafka ブローカーで EventBridge Pipes を認証するように、クライアント証明書を (Secrets Manager のシークレットとして) 設定します。クライアント証明書は、 サーバーのトラストストア内の認証局 (CA) によって署名される必要があります。

Apache Kafka クラスターは Pipes にサーバー証明書を送信して、 EventBridge Pipes で Apache Kafka EventBridge ブローカーを認証します。サーバー証明書は、パブリック CA 証明書またはプライベート CA/自己署名証明書にすることができます。パブリック CA 証明書は、 EventBridge Pipes トラストストアにある CA によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバーのルート CA 証明書を (Secrets Manager のシークレットとして) 設定します。 EventBridge Pipes はルート証明書を使用して Apache Kafka ブローカーを検証します。

mTLS の詳細については、「Introducing mutual TLS authentication for Amazon MSK as an source」(ソースとしての Amazon MSK のための相互 TLS の紹介) を参照してください。

クライアント証明書シークレットの設定

CLIENT_CERTICATE_TLS_AUTH シークレットは、証明書フィールドとプライベートキーフィールドを必要とします。暗号化されたプライベートキーの場合、シークレットはプライベートキーのパスワードを必要とします。証明書とプライベートキーは、どちらも PEM 形式である必要があります。

注記

EventBridge Pipes は、PBES1 (PBES2 はサポートしていません) プライベートキー暗号化アルゴリズムをサポートしています。

証明書フィールドには、クライアント証明書で始まり、その後に中間証明書が続き、ルート証明書で終わる証明書のリストが含まれている必要があります。各証明書は、以下の構造を使用した新しい行で始める必要があります。

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager は最大 65,536 バイトのシークレットをサポートします。これは、長い証明書チェーンにも十分な領域です。

プライベートキーは、以下の構造を使用した PKCS #8 形式にする必要があります。

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

暗号化されたプライベートキーには、以下の構造を使用します。

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

以下は、暗号化されたプライベートキーを使用する mTLS 認証のシークレットの内容を示す例です。暗号化されたプライベートキーの場合は、シークレットにプライベートキーのパスワードを含めます。

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

サーバルート CA 証明書シークレットの設定

このシークレットは、Apache Kafka ブローカーがプライベート CA によって署名された証明書で TLS 暗号化を使用する場合に作成します。TLS 暗号化は、VPC、SASL/SCRAM、SASL/PLAIN、または mTLS 認証に使用できます。

サーバールート CA 証明書シークレットには、PEM 形式の Apache Kafka ブローカーのルート CA 証明書が含まれるフィールドが必要です。以下は、このシークレットの構造を示す例です。

{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"

ネットワーク構成

プライベート VPC 接続を使用するセルフマネージド Apache Kafka 環境を使用している場合は、Apache Kafka ブローカーに関連付けられた Amazon Virtual Private Cloud (Amazon VPC) リソースにアクセスできる EventBridge 必要があります。

  • Apache Kafka クラスターの VPC にアクセスするには、ソースのサブネットにアウトバウンドインターネットアクセス EventBridge を使用できます。パブリックサブネットの場合、これはマネージド NAT ゲートウェイである必要があります。プライベートサブネットの場合は NAT ゲートウェイでも、独自の NAT でもかまいません。NAT にパブリック IP アドレスが割り当てられ、インターネットに接続できることを確認します。

  • EventBridge Pipes は を介したイベント配信もサポートしているためAWS PrivateLink、 Amazon Virtual Private Cloud (Amazon VPC) にあるイベントソースから Pipes ターゲットにイベントを送信できます。パブリックインターネットを経由する必要はありません。Pipes を使用すると、インターネットゲートウェイをデプロイしたり、ファイアウォールルールを設定したり、プロキシサーバーを設定したりすることなく、 Amazon Managed Streaming for Apache Kafka (Amazon MSK)、セルフマネージド Apache Kafka、およびプライベートサブネットに存在する Amazon MQ ソースからポーリングできます。

    VPC エンドポイントを設定するには、「 ユーザーガイド」の「VPC エンドポイントの作成AWS PrivateLink 」を参照してください。サービス名で、 を選択しますcom.amazonaws.region.pipes-data

Amazon VPC セキュリティグループは、少なくとも以下のルールを使用して設定してください。

  • インバウンドルール – ソースに指定されたセキュリティグループの Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ソースに指定されたセキュリティグループの Apache Kafka ブローカーポート上のすべてのトラフィックを許可します。

    ブローカーポートには以下が含まれます。

    • プレーンテキストの場合は 9092

    • TLS の場合は 9094

    • SASL の場合は 9096

    • IAM 用 9098

Apache Kafka ソースによるコンシューマーの自動スケーリング

最初に Apache Kafka ソースを作成すると、 は Kafka トピック内のすべてのパーティションを処理するために 1 つのコンシューマーを EventBridge 割り当てます。各コンシューマーには、増加したワークロードを処理するために同時実行される複数のプロセッサがあります。さらに、 は、ワークロードに基づいてコンシューマーの数 EventBridge を自動的にスケールアップまたはスケールダウンします。各パーティションでメッセージの順序を保つため、コンシューマーの最大数は、トピック内のパーティションあたり 1 つとなっています。

1 分間隔で、 はトピック内のすべてのパーティションのコンシューマーオフセットラグ EventBridge を評価します。遅延が高すぎる場合、パーティションはメッセージを処理 EventBridge できるよりも速くメッセージを受信しています。必要に応じて、コンシューマーをトピック EventBridge に追加または削除します。コンシューマーを追加または削除するスケーリングプロセスは、評価から 3 分以内に行われます。

ターゲットが過負荷になっている場合、 はコンシューマーの数 EventBridge を減らします。このアクションにより、コンシューマーが取得し関数に送信するメッセージの数が減り、関数への負荷が軽減されます。