Verwenden von a DAG zum Schreiben benutzerdefinierter Metriken in CloudWatch - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden von a DAG zum Schreiben benutzerdefinierter Metriken in CloudWatch

Sie können das folgende Codebeispiel verwenden, um einen gerichteten azyklischen Graph (DAG) zu schreiben, mit dem Metriken auf Betriebssystemebene für eine PythonOperator Amazon-Umgebung abgerufen werden. MWAA DAGAnschließend werden die Daten als benutzerdefinierte Metriken bei Amazon veröffentlicht CloudWatch.

Benutzerdefinierte Metriken auf Betriebssystemebene bieten Ihnen zusätzliche Einblicke in die Nutzung von Ressourcen wie virtuellen Speicher undCPU. Sie können diese Informationen verwenden, um die Umgebungsklasse auszuwählen, die am besten zu Ihrer Arbeitslast passt.

Version

  • Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10 verwenden.

Voraussetzungen

Um das Codebeispiel auf dieser Seite zu verwenden, benötigen Sie Folgendes:

Berechtigungen

  • Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

Abhängigkeiten

  • Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Abhängigkeiten erforderlich.

Codebeispiel

  1. Navigieren Sie in der Befehlszeile zu dem Ordner, in dem Ihr DAG Code gespeichert ist. Beispielsweise:

    cd dags
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unterdag-custom-metrics.py. MWAA-ENV-NAMEErsetzen Sie es durch Ihren Umgebungsnamen.

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import os,json,boto3,psutil,socket def publish_metric(client,name,value,cat,unit='None'): environment_name = os.getenv("MWAA_ENV_NAME") value_number=float(value) hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print('writing value',value_number,'to metric',name) response = client.put_metric_data( Namespace='MWAA-Custom', MetricData=[ { 'MetricName': name, 'Dimensions': [ { 'Name': 'Environment', 'Value': environment_name }, { 'Name': 'Category', 'Value': cat }, { 'Name': 'Host', 'Value': ip_address }, ], 'Timestamp': datetime.now(), 'Value': value_number, 'Unit': unit }, ] ) print(response) return response def python_fn(**kwargs): client = boto3.client('cloudwatch') cpu_stats = psutil.cpu_stats() print('cpu_stats', cpu_stats) virtual = psutil.virtual_memory() cpu_times_percent = psutil.cpu_times_percent(interval=0) publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes') publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes') publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes') publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes') publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes') publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes') publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent') publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent') publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent') publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent') return "OK" with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag: t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
  3. Führen Sie den folgenden AWS CLI Befehl aus, um das in den Bucket Ihrer Umgebung DAG zu kopieren, und lösen Sie es dann DAG mithilfe der Apache Airflow-Benutzeroberfläche aus.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Wenn der Vorgang erfolgreich DAG ausgeführt wird, sollten Sie in Ihren Apache Airflow-Protokollen etwas Ähnliches wie das Folgende sehen:

    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    ...
    [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
    [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0