Amazon MQ で Lambda を使用する - AWS Lambda

Amazon MQ で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon MQ は、Apache ActiveMQ および RabbitMQ 用のマネージドメッセージブローカーサービスです。メッセージブローカーを使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューイベント送信先を介した正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、もしくは、異なるネットワークトポロジやその他のインフラストラクチャのニーズを提供することにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。

Lambda 関数を使用することで、Amazon MQ メッセージブローカーからのレコードを処理できます。Lambda は、ブローカーからメッセージを読み取り関数を同期的に呼び出す Lambda リソース、イベントソースマッピングによって関数を呼び出します。

警告

Lambda イベントソースマッピングは各イベントを少なくとも 1 回処理し、レコードの重複処理が発生する可能性があります。重複するイベントに関連する潜在的な問題を避けるため、関数コードを冪等にすることを強くお勧めします。詳細については、 AWS ナレッジセンターの「Lambda 関数を冪等にするにはどうすればよいですか?」を参照してください。

Amazon MQ イベントソースマッピングには、次の設定制限があります。

  • 同時実行数 — Amazon MQ イベントソースマッピングを使用する Lambda 関数には、デフォルトの最大同時実行数設定があります。ActiveMQ の場合、Lambda サービスは同時実行環境の数を 5 つに制限します。RabbitMQ の場合、同時実行環境の数は 1 つに制限されます。関数の予約またはプロビジョニングされる同時実行数設定を変更しても、Lambda サービスはこれ以上実行環境を利用できるようにしません。デフォルトの最大同時実行数の増加をリクエストするには、AWS Support にお問い合わせください。

  • クロスアカウント - Lambda はクロスアカウント処理をサポートしていません。Lambda を使用して、別の AWS アカウント にある Amazon MQ メッセージブローカーからのレコードを処理することはできません。

  • 認証 - ActiveMQ では、ActiveMQ SimpleAuthenticationPlugin のみサポートされています。RabbitMQ の場合、PLAIN 認証メカニズムのみサポートされています。ユーザーは、資格情報の管理には AWS Secrets Manager を使用します。ActiveMQ 認証の詳細については、Amazon MQ デベロッパーガイドIntegrating ActiveMQ brokers with LDAP を参照してください。

  • 接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、Amazon MQ デベロッパーガイドQuotas in Amazon MQBrokers のセクションを参照してください。

  • 接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、Lambda 関数が VPC にアクセスしてメッセージを受信する必要があります。詳細については、このトピックで後述する「ネットワーク構成」を参照してください。

  • イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、内部的にトピックとして動作し、キューとして Lambda と対話しながら動作します。詳細については、Apache ActiveMQ ウェブサイトの Virtual Destinations および RabbitMQ ウェブサイトの Virtual Hostsを参照してください。

  • ネットワークトポロジ - ActiveMQ の場合、イベントソースマッピングごとに、1つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、イベントソースマッピングごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、Amazon MQ デベロッパーガイドActive MQ Broker Architecture および Rabbit MQ Broker Architecture を参照してください。

  • プロトコル — サポートされるプロトコルは、Amazon MQ の統合のタイプによって異なります。

    • ActiveMQ 統合の場合、Lambda は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。JMS プロトコル内では、TextMessage および BytesMessage のみがサポートされています。Lambda は、JMS カスタムプロパティもサポートしています。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの OpenWire を参照してください。

    • RabbitMQ 統合の場合、Lambda は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの AMQP 0-9-1 Complete Reference Guide を参照してください。

Lambda は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、Amazon MQ デベロッパーガイドAmazon MQ リリースノートを参照してください。

注記

デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイのないブローカーの場合、Lambda はそのウィンドウ中にメッセージを処理できません。

Lambda コンシューマーグループ

Amazon MQ と対話するため、Lambda は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。

Amazon MQ イベントソースの場合、Lambda はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。Lambda は、最大 6 MB のペイロードサイズを処理する、バッチ処理ウィンドウの期限が切れる、またはレコード数が完全なバッチサイズに到達するまで、メッセージをプルします。詳細については、「バッチ処理動作」を参照してください。

コンシューマーグループは、メッセージをバイトの BLOB として取得し、それらを base64 でエンコードして単一の JSON ペイロードに変換してから、関数を呼び出します。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

Amazon CloudWatch の ConcurrentExecutions メトリクスを使用して、特定の関数の同時実行使用率を監視できます。同時実行の詳細については、「関数に対する予約済み同時実行数の設定」を参照してください。

例 Amazon MQ レコードイベント
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注記

RabbitMQ の例では、pizzaQueue は RabbitMQ キューの名前、/ は仮想ホストの名前です。メッセージを受信すると、イベントソースは pizzaQueue::/ の下にメッセージを一覧表示します。

実行ロールのアクセス許可

Amazon MQ ブローカーからレコードを読み取るには、Lambda 関数は次のアクセス許可を実行ロールに追加する必要があります。

注記

暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt 権限も追加します。

ネットワーク構成

イベントソースマッピングを通じて Lambda にブローカーへのフルアクセスを許可するには、ブローカーがパブリックエンドポイント (パブリック IP アドレス) を使用するか、ブローカーを作成した Amazon VPC へのアクセスを提供する必要があります。

デフォルトでは、Amazon MQ ブローカーを作成すると、PubliclyAccessible フラグは false に設定されます。ブローカーがパブリック IP アドレスを受け取るには、PubliclyAccessible フラグを true にする必要があります。

Lambda で Amazon MQ を使用するときのベストプラクティスは、「VPC エンドポイント」を使用し、Lambda 関数にブローカーの VPC へのアクセスを付与することです。Lambda にエンドポイントをデプロイし、ActiveMQ のみの場合は AWS Security Token Service (AWS STS) のエンドポイントをデプロイします。ブローカーが認証を使用する場合、AWS Secrets Manager にもエンドポイントをデプロイします。詳細については、「VPCエンドポイントの使用」を参照してください。

または、Amazon MQ ブローカーを含む VPC の各パブリックサブネットに NAT ゲートウェイを設定します。詳細については、「VPC に接続された Lambda 関数にインターネットアクセスを有効にする」を参照してください。

Amazon MQ ブローカーのイベントソースマッピングを作成すると、Lambda はブローカーの VPC のサブネットおよびセキュリティグループに Elastic Network Interface (ENI) が既に存在するかどうかを確認します。Lambda が既存の ENI を検出した場合、再利用しようとします。それ以外の場合、Lambda は新しい ENI を作成し、イベントソースに接続して関数を呼び出します。

注記

Lambda 関数は、Lambda サービスが所有する VPC 内で常に実行されます。これらの VPC はサービスによって自動的に管理され、顧客には表示されません。関数を Amazon VPC に接続することもできます。いずれの場合、関数の VPC 設定はイベントソースマッピングに影響しません。Lambda がイベントソースに接続する方法を判定するのは、イベントソースの VPC の設定のみです。

VPC セキュリティグループのルール

次のルール (最低限) に従ってクラスターを含む Amazon VPC のセキュリティグループを設定してください。

  • インバウンドルール – イベントソースに指定されたセキュリティグループの、独自のセキュリティグループ内からのブローカーポート上のすべてのトラフィックを許可します。ActiveMQ はデフォルトでポート 61617 を使用し、RabbitMQ はデフォルトでポート 5671 を使用します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ブローカーポートのすべてのトラフィックを独自のセキュリティグループ内で許可します。ActiveMQ はデフォルトでポート 61617 を使用し、RabbitMQ はデフォルトでポート 5671 を使用します。

  • NAT ゲートウェイの代わりに VPC エンドポイントを使用する場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

VPCエンドポイントの使用

VPC エンドポイントを使用するとき、ENI を使用し、関数を呼び出す API コールはこれらのエンドポイントを経由してルーティングされます。Lambda サービスプリンシパルは、これらの ENI を使用するすべての関数に lambda:InvokeFunction を呼び出す必要があります。さらに、ActiveMQ の場合、Lambda サービスプリンシパルは ENI を使用するロールに sts:AssumeRole を呼び出す必要があります。

デフォルトでは、VPC エンドポイントはオープンな IAM ポリシーがあります。特定のプリンシパルのみがそのエンドポイントを使用して必要なアクションを実行するため、これらのポリシーを制限することがベストプラクティスです。イベントソースマッピングが Lambda 関数を呼び出せるようにするには、VPC エンドポイントポリシーは Lambda サービスプリンシパルが lambda:InvokeFunction を呼び出せるようにする必要があります。ActiveMQ の場合は sts:AssumeRole です。組織内で発生する API コールのみを許可するように VPC エンドポイントポリシーを制限すると、イベントソースマッピングが正しく機能しなくなります。

次の VPC エンドポイントポリシーの例では、AWS STS および Lambda エンドポイントに必要なアクセスを付与する方法を示しています。

例 VPC エンドポイントポリシー - AWS STS エンドポイント (ActiveMQ のみ)
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
例 VPC エンドポイントポリシー – Lambda エンドポイント
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Amazon MQ ブローカーが認証を使用する場合、Secrets Manager エンドポイントの VPC エンドポイントポリシーを制限することもできます。Secrets Manager API を呼び出す場合、Lambda は Lambda サービスプリンシパルではなく、関数ロールを使用します。次の例では、Secrets Manager エンドポイントポリシーを示します。

例 VPC エンドポイントポリシー - Secrets Manager エンドポイント
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

アクセス許可を追加し、イベントソースマッピングを作成するには

イベントソースマッピングを作成し、Amazon MQ ブローカーから Lambda 関数にレコードを送信するよう Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、単一の関数で複数のソースから項目を処理したりできます。

Amazon MQ から読み取るように関数を設定するには、必要なアクセス許可を追加し Lambda コンソールで MQ トリガーを作成します。

アクセス許可を追加してトリガーを作成するには
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Configuration] (設定) タブを開き、次に [Permissions] (アクセス許可) をクリックします。

  4. [実行ロール] で、実行ロールのリンクを選択します。このリンクを選択すると、IAM コンソールでロールが開きます。

    実行ロールのリンク
  5. アクセス許可を追加、インラインポリシーを作成の順に選択します。

    IAMコンソールでインラインポリシーを作成する
  6. [ポリシーエディター] で、[JSON] を選択します。以下のポリシーを入力します。Amazon MQ ブローカーから読み取るには、関数にこれらのアクセス許可が必要です。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "mq:DescribeBroker", "secretsmanager:GetSecretValue", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
    注記

    暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt アクセス許可も追加する必要があります。

  7. [Next] を選択します。ポリシー名を入力し、[ポリシーの作成] を選択します。

  8. Lambda コンソールの関数に戻ります。[関数の概要] で [トリガーを追加] をクリックします。

    Lambda コンソールの関数の概要セクション
  9. MQトリガータイプを選択します。

  10. 必須のオプションを設定し、[追加] を選択します。

Lambda では、Amazon MQ イベントソースの以下のオプションがサポートされています。

  • MQ ブローカー — Amazon MQ ブローカーを選択します。

  • バッチサイズ - 単一のバッチで取得するメッセージの最大数を設定します。

  • キュー名 - 使用する Amazon MQ キューを入力します。

  • ソースアクセス設定 — 仮想ホスト情報とブローカーの認証情報を保存する Secrets Manager のシークレットを入力します。

  • トリガーの有効化 - レコードの処理を停止するトリガーを無効にします。

トリガーを有効または無効にする(または削除する)には、 MQ トリガーをデザイナーで選択します。トリガーを再設定するには、イベントソースマッピング API 操作を使用します。

イベントソースマッピングの更新

イベントソースマッピングを更新するには、update-event-source-mapping コマンドを使用します。次のコマンド例では、イベントソースマッピングを更新して、バッチサイズを 2 にします。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda は、これらの設定を非同期的に更新します。このプロセスが完了するまで、出力には変更は反映されません。get-event-source-mapping コマンドを使用して、リソースの現在のステータスを表示します。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

イベントソースマッピングエラー

Lambda 関数で回復不可能なエラーが発生すると、Amazon MQ コンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、StateTransitionReason から返された詳細の EventSourceMapping フィールドをチェックし、以下のいずれかのコードを探します。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースの認証に失敗しました。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセス許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

レコードは、サイズが原因で Lambda が削除した場合も、未処理になります。Lambda レコードのサイズ上限は 6 MB です。関数エラー時にメッセージを再配信するには、デッドレターキュー(DLQ)を使用します。詳細については、Apache ActiveMQ ウェブサイトの Message Redelivery and DLQ Handling および RabbitMQ ウェブサイトの Reliability Guide を参照してください。

注記

Lambda はカスタム再配信ポリシーをサポートしていません。代わりに Lambda は、Apache ActiveMQ ウェブサイトの Redelivery Policy ページのデフォルト値のポリシーを使用し、maximumRedeliveries は 6 に設定します。

Amazon MQ と RabbitMQ の設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MQ と RabbitMQ に適用されるのは一部のパラメータのみです。

Amazon MQ と RabbitMQ に適用されるイベントソースパラメータ
[Parameter] (パラメータ) 必須 デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

true

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

キュー

N

消費する Amazon MQ ブローカーの送信先キューの名前。

SourceAccessConfigurations

N

ActiveMQ の場合、BASIC_AUTH 認証情報です。RabbitMQ の場合、BASIC_AUTH 認証情報と VIRTUAL_HOST 情報の両方を含めることができます。