Amazon ECS イベントの処理 - Amazon Elastic Container Service

Amazon ECS イベントの処理

Amazon ECS は、少なくとも 1 回 の割合でイベントを送信します。つまり、あるイベントの複数のコピーを受け取る場合があるということです。さらに、イベントは発生順にイベントリスナーに送信されない場合があります。

イベントを適切に順序付けられるように、各イベントの detail セクションには version プロパティが含まれています。リソースの状態が変わるたびに、この version はインクリメントされます。重複するイベントには、detail オブジェクト内で同じ version バージョンがあります。EventBridge で Amazon ECS コンテナインスタンスおよびタスク状態をレプリケートする場合は、Amazon ECS API によって報告されるリソースのバージョンと、そのリソースについて EventBridge で報告される version を比較して、イベントストリームでのバージョンが最新であることを確認できます。バージョンプロパティ番号が高いイベントは、バージョン番号が低いイベントより後で発生したものとして処理されます。

例: AWS Lambda 関数でのイベントの処理

次の Python 3.9 で記述された Lambda 関数の例では、タスク状態変更イベントとコンテナインスタンス状態変更イベントの両方をキャプチャし、2 つの Amazon DynamoDB テーブルのいずれかに保存します。

  • ECSCtrInstanceState – コンテナインスタンスの最新状態を保存します。テーブル ID は、コンテナインスタンスの containerInstanceArn 値です。

  • ECSTaskState – タスクの最新状態を保存します。テーブル ID は、タスクの taskArn 値です。

import json import boto3 def lambda_handler(event, context): id_name = "" new_record = {} # For debugging so you can see raw event format. print('Here is the event:') print((json.dumps(event))) if event["source"] != "aws.ecs": raise ValueError("Function only supports input from events with a source type of: aws.ecs") # Switch on task/container events. table_name = "" if event["detail-type"] == "ECS Task State Change": table_name = "ECSTaskState" id_name = "taskArn" event_id = event["detail"]["taskArn"] elif event["detail-type"] == "ECS Container Instance State Change": table_name = "ECSCtrInstanceState" id_name = "containerInstanceArn" event_id = event["detail"]["containerInstanceArn"] else: raise ValueError("detail-type for event is not a supported type. Exiting without saving event.") new_record["cw_version"] = event["version"] new_record.update(event["detail"]) # "status" is a reserved word in DDB, but it appears in containerPort # state change messages. if "status" in event: new_record["current_status"] = event["status"] new_record.pop("status") # Look first to see if you have received a newer version of an event ID. # If the version is OLDER than what you have on file, do not process it. # Otherwise, update the associated record with this latest information. print("Looking for recent event with same ID...") dynamodb = boto3.resource("dynamodb", region_name="us-east-1") table = dynamodb.Table(table_name) saved_event = table.get_item( Key={ id_name : event_id } ) if "Item" in saved_event: # Compare events and reconcile. print(("EXISTING EVENT DETECTED: Id " + event_id + " - reconciling")) if saved_event["Item"]["version"] < event["detail"]["version"]: print("Received event is a more recent version than the stored event - updating") table.put_item( Item=new_record ) else: print("Received event is an older version than the stored event - ignoring") else: print(("Saving new event - ID " + event_id)) table.put_item( Item=new_record )

以下の Fargate の例では、Python 3.9 で書かれた Lambda 関数で、タスク状態変更イベントを取得し、以下の Amazon DynamoDB テーブルに保存しています。

import json import boto3 def lambda_handler(event, context): id_name = "" new_record = {} # For debugging so you can see raw event format. print('Here is the event:') print((json.dumps(event))) if event["source"] != "aws.ecs": raise ValueError("Function only supports input from events with a source type of: aws.ecs") # Switch on task/container events. table_name = "" if event["detail-type"] == "ECS Task State Change": table_name = "ECSTaskState" id_name = "taskArn" event_id = event["detail"]["taskArn"] else: raise ValueError("detail-type for event is not a supported type. Exiting without saving event.") new_record["cw_version"] = event["version"] new_record.update(event["detail"]) # "status" is a reserved word in DDB, but it appears in containerPort # state change messages. if "status" in event: new_record["current_status"] = event["status"] new_record.pop("status") # Look first to see if you have received a newer version of an event ID. # If the version is OLDER than what you have on file, do not process it. # Otherwise, update the associated record with this latest information. print("Looking for recent event with same ID...") dynamodb = boto3.resource("dynamodb", region_name="us-east-1") table = dynamodb.Table(table_name) saved_event = table.get_item( Key={ id_name : event_id } ) if "Item" in saved_event: # Compare events and reconcile. print(("EXISTING EVENT DETECTED: Id " + event_id + " - reconciling")) if saved_event["Item"]["version"] < event["detail"]["version"]: print("Received event is a more recent version than the stored event - updating") table.put_item( Item=new_record ) else: print("Received event is an older version than the stored event - ignoring") else: print(("Saving new event - ID " + event_id)) table.put_item( Item=new_record )