Utilisation d'un DAG pour écrire des métriques personnalisées dansCloudWatch - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation d'un DAG pour écrire des métriques personnalisées dansCloudWatch

Vous pouvez utiliser l'exemple de code suivant pour écrire un graphe acyclique dirigé (DAG) qui exécute unPythonOperatorpour récupérer des métriques au niveau du système d'exploitation pour un environnement Amazon MWAA. Le DAG publie ensuite les données sous forme de métriques personnalisées sur Amazon.CloudWatch.

Les mesures personnalisées au niveau du système d'exploitation vous offrent une visibilité supplémentaire sur la façon dont les employés de votre environnement utilisent les ressources telles que la mémoire virtuelle et le processeur. Vous pouvez utiliser ces informations pour sélectionnerclasse d'environnementqui convient le mieux à votre charge de travail.

Version

  • Vous pouvez utiliser l'exemple de code de cette page avecApache Airflow v2 et versions ultérieuresdansPython 3.10.

Prérequis

Pour utiliser l'exemple de code de cette page, vous devez disposer des éléments suivants :

Autorisations

  • Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code de cette page.

Dépendances

  • Aucune dépendance supplémentaire n'est requise pour utiliser l'exemple de code de cette page.

Exemple de code

  1. Dans votre invite de commandes, accédez au dossier dans lequel votre code DAG est stocké. Par exemple :

    cd dags
  2. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sousdag-custom-metrics.py. RemplacerMWAA-ENV-NAMEavec le nom de votre environnement.

    from airflow import DAG from airflow.operators.python_operator import PythonOperator from airflow.utils.dates import days_ago from datetime import datetime import os,json,boto3,psutil,socket def publish_metric(client,name,value,cat,unit='None'): environment_name = os.getenv("MWAA_ENV_NAME") value_number=float(value) hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) print('writing value',value_number,'to metric',name) response = client.put_metric_data( Namespace='MWAA-Custom', MetricData=[ { 'MetricName': name, 'Dimensions': [ { 'Name': 'Environment', 'Value': environment_name }, { 'Name': 'Category', 'Value': cat }, { 'Name': 'Host', 'Value': ip_address }, ], 'Timestamp': datetime.now(), 'Value': value_number, 'Unit': unit }, ] ) print(response) return response def python_fn(**kwargs): client = boto3.client('cloudwatch') cpu_stats = psutil.cpu_stats() print('cpu_stats', cpu_stats) virtual = psutil.virtual_memory() cpu_times_percent = psutil.cpu_times_percent(interval=0) publish_metric(client=client, name='virtual_memory_total', cat='virtual_memory', value=virtual.total, unit='Bytes') publish_metric(client=client, name='virtual_memory_available', cat='virtual_memory', value=virtual.available, unit='Bytes') publish_metric(client=client, name='virtual_memory_used', cat='virtual_memory', value=virtual.used, unit='Bytes') publish_metric(client=client, name='virtual_memory_free', cat='virtual_memory', value=virtual.free, unit='Bytes') publish_metric(client=client, name='virtual_memory_active', cat='virtual_memory', value=virtual.active, unit='Bytes') publish_metric(client=client, name='virtual_memory_inactive', cat='virtual_memory', value=virtual.inactive, unit='Bytes') publish_metric(client=client, name='virtual_memory_percent', cat='virtual_memory', value=virtual.percent, unit='Percent') publish_metric(client=client, name='cpu_times_percent_user', cat='cpu_times_percent', value=cpu_times_percent.user, unit='Percent') publish_metric(client=client, name='cpu_times_percent_system', cat='cpu_times_percent', value=cpu_times_percent.system, unit='Percent') publish_metric(client=client, name='cpu_times_percent_idle', cat='cpu_times_percent', value=cpu_times_percent.idle, unit='Percent') return "OK" with DAG(dag_id=os.path.basename(__file__).replace(".py", ""), schedule_interval='*/5 * * * *', catchup=False, start_date=days_ago(1)) as dag: t = PythonOperator(task_id="memory_test", python_callable=python_fn, provide_context=True)
  3. Exécutez la commande suivanteAWS CLIcommande permettant de copier le DAG dans le bucket de votre environnement, puis de déclencher le DAG à l'aide de l'interface utilisateur d'Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Si le DAG fonctionne correctement, vous devriez voir quelque chose de similaire à ce qui suit dans vos journaux Apache Airflow :

    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - cpu_stats scpustats(ctx_switches=3253992384, interrupts=1964237163, soft_interrupts=492328209, syscalls=0)
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 16024199168.0 to metric virtual_memory_total
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'fad289ac-aa51-46a9-8b18-24e4e4063f4d', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 14356287488.0 to metric virtual_memory_available
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '6ef60085-07ab-4865-8abf-dc94f90cab46', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - writing value 1342296064.0 to metric virtual_memory_used
    [2022-08-16, 10:54:46 UTC] {{logging_mixin.py:109}} INFO - {'ResponseMetadata': {'RequestId': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'd5331438-5d3c-4df2-bc42-52dcf8d60a00', 'content-type': 'text/xml', 'content-length': '212', 'date': 'Tue, 16 Aug 2022 17:54:45 GMT'}, 'RetryAttempts': 0}}
    ...
    [2022-08-16, 10:54:46 UTC] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-08-16, 10:54:46 UTC] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=dag-custom-metrics, task_id=memory_test, execution_date=20220816T175444, start_date=20220816T175445, end_date=20220816T175446
    [2022-08-16, 10:54:46 UTC] {{local_task_job.py:154}} INFO - Task exited with return code 0