設定提醒、部署和排程 - AWS 連接詞

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

設定提醒、部署和排程

本主題說明如何設定 AWS Glue 資料品質的警示、部署和排程。

在 Amazon EventBridge 整合中設定警示和通知

AWS 「Glue 資料品質」支援發佈 EventBridge 事件,這些事件會在資料品質規則集評估執行完成時發出。如此您就可以輕鬆設定資料品質規則失敗時的提醒。

以下是在資料型錄中評估資料品質規則集時的範例事件。有了這些資訊,您可以檢閱 Amazon 提供的資料 EventBridge。您可以發出其他 API 呼叫以取得更多詳細資訊。例如,使用結果 ID 呼叫 get_data_quality_result API,以取得特定執行的詳細資訊。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_DATA_CATALOG", "runId":"dqrun-12334567890", "databaseName": "db-123", "tableName": "table-123", "catalogId": "123456789012" }, "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00, "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

以下是當您在 AWS Glue ETL 或 AWS Glue Studio 筆記本中評估資料品質規則集時發佈的範例事件。

{ "version":"0", "id":"abcdef00-1234-5678-9abc-def012345678", "detail-type":"Data Quality Evaluation Results Available", "source":"aws.glue-dataquality", "account":"123456789012", "time":"2017-09-07T18:57:21Z", "region":"us-west-2", "resources":[], "detail":{ "context": { "contextType": "GLUE_JOB", "jobId": "jr-12334567890", "jobName": "dq-eval-job-1234", "evaluationContext": "", } "resultID": "dqresult-12334567890", "rulesetNames": ["rulset1"], "state":"SUCCEEDED", "score": 1.00 "rulesSucceeded": 100, "rulesFailed": 0, "rulesSkipped": 0 } }

對於「資料品質」評估同時在「資料目錄」和 ETL 工作中執行,「將量度發佈至 Amazon CloudWatch」選項 (預設為選取) 必須保持選取狀態,才能發 EventBridge 佈才能正常工作。

設定 EventBridge 通知

資料品質屬性 AWS CloudFormation

若要接收發出的事件並定義目標,您必須設定 Amazon EventBridge 規則。若要建立規則:

  1. 打開 Amazon EventBridge 控制台。

  2. 在導覽列的匯流排區段下選擇規則

  3. 選擇 Create Rule (建立規則)。

  4. 定義規則詳細資訊上:

    1. 對於名稱,輸入 myDQRule

    2. 輸入描述 (選用)。

    3. 對於事件匯流排,請選取您的事件匯流排。如果沒有事件匯流排,請保留其預設值。

    4. 對於規則類型,選取具有事件模式的規則,然後選擇下一步

  5. 建置事件模式上:

    1. 對於事件來源,請選取AWS 事件或 EventBridge 夥伴事件。

    2. 略過示範事件區段。

    3. 對於建立方法,選取使用模式表單

    4. 對於事件模式:

      1. 選取事件來源的 AWS 服務

      2. 選取「Glue 合資料品質」進行 AWS 維修。

      3. 對於事件類型,選取可用的資料品質評估結果

      4. 對於特定狀態,選取失敗。然後您會看到類似以下內容的事件模式:

        { "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "state": ["FAILED"] } }
      5. 如需更多設定選項,請參閱 事件模式的其他組態選項

  6. 選取目標上:

    1. 對於目標類型,選取 AWS 服務

    2. 使用 [選取目標] 下拉式清單選擇您想要連線的 AWS 服務 (SNS、Lambda、SQS 等),然後選擇 [下一步]。

  7. 設定標籤上按一下新增標籤以新增選用標籤,然後選擇下一步

  8. 您會看到所有選取項目的摘要頁面。選擇底部的建立規則

事件模式的其他組態選項

除了根據成功或失敗篩選事件之外,您可能還想要根據不同參數進一步篩選事件。

若要這麼做,請前往「事件模式」區段,然後選取編輯模式以指定其他參數。請注意,事件模式中的欄位需區分大小寫。以下是設定事件模式的範例。

若要從評估特定規則集的特定資料表擷取事件,請使用此類型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_DATA_CATALOG"], "databaseName": "db-123", "tableName": "table-123", }, "rulesetNames": ["ruleset1", "ruleset2"] "state": ["FAILED"] } }

若要從 ETL 體驗中的特定任務擷取事件,請使用此類型的模式:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "context": { "contextType": ["GLUE_JOB"], "jobName": ["dq_evaluation_job1", "dq_evaluation_job2"] }, "state": ["FAILED"] } }

若要擷取分數低於特定閾值 (例如 70%) 的事件:

{ "source": ["aws.glue-dataquality"], "detail-type": ["Data Quality Evaluation Results Available"], "detail": { "score": [{ "numeric": ["<=", 0.7] }] } }

將通知格式化為電子郵件

您有時需要向業務團隊傳送格式良好的電子郵件通知。您可以使用 Amazon EventBridge 和 AWS Lambda 來實現這一目標。

格式化為電子郵件的資料品質通知

下列範例程式碼可用來格式化資料品質通知以產生電子郵件。

import boto3 import json from datetime import datetime sns_client = boto3.client('sns') glue_client = boto3.client('glue') sns_topic_arn = 'arn:aws:sns:<region-code>:<account-id>:<sns-topic-name>' def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info sns_client.publish( TopicArn=sns_topic_arn, Message=message_text, Subject=subject_text ) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

在 CloudWatch 整合中設定警示和通知

我們建議的方法是使用 Amazon 設定資料品質提醒 EventBridge,因為 Amazon EventBridge 需要一次性設定來提醒客戶。但是, CloudWatch 由於熟悉,一些客戶更喜歡 Amazon。對於此類客戶,我們提供與 Amazon 的集成 CloudWatch。

每個「 AWS Glue 資料品質」評估都會在每次資料品質執行時發出一對名為 glue.data.quality.rules.passed (表示通過的規則數目) 和 glue.data.quality.rules.failed (指出失敗規則的數目) 的測量結果。您可以使用發出的此指標來建立警示,以在指定的資料品質執行低於閾值時提醒使用者。若要開始設定將透過 Amazon SNS 通知傳送電子郵件的警示,請依照以下步驟操作:

若要開始設定將透過 Amazon SNS 通知傳送電子郵件的警示,請依照以下步驟操作:

  1. 打開 Amazon CloudWatch 控制台。

  2. 選擇指標下的所有指標。您將在標題為 "Glue Data Quality" 的自訂命名空間下看到額外的命名空間。

    注意

    開始執行 AWS Glue 資料品質時,請確定已啟用「將指標發佈到 Amazon」 CloudWatch 核取方塊。否則,該特定運行的指標將不會發佈到 Amazon CloudWatch。

    Glue Data Quality 命名空間下,您可以看到每個資料表中依規則集發出的指標。本主題的目的在於,如果此值超過 1 (表示如果我們看到失敗規則評估的數量大於 1,我們希望收到通知),我們將使用 glue.data.quality.rules.failed 規則和警示。

  3. 若要建立警示,請選擇警示下的所有警示

  4. 選擇 Create alarm (建立警示)。

  5. 選擇 Select metric (選取指標)。

  6. 選取與您建立之資料表對應的 glue.data.quality.rules.failed 指標,然後選擇選取指標

  7. 指標區段下的指定指標和條件索引標籤下:

    1. Statistic (統計資料) 中選擇 Sum (總和)

    2. 對於期間,選擇 1 分鐘

  8. 條件區段下:

    1. 對於閾值類型,選擇靜態

    2. 對於每當 glue.data.quality.rules.failed 為…,選取大於/等於

    3. 對於比…,輸入 1 作為閾值。

    這些選擇意味著,如果 glue.data.quality.rules.failed 指標發出的值大於或等於 1,我們將觸發警示。但是,如果沒有資料,我們會將其視為可接受。

  9. 選擇下一步

  10. 設定動作上:

    1. 對於警示狀態觸發區段,選擇警示中

    2. 對於將通知傳送至下列 SNS 主題區段,選擇建立新主題以透過新的 SNS 主題傳送通知

    3. 對於將接收通知的電子郵件端點,輸入電子郵件地址。然後按一下建立主題

    4. 選擇下一步

  11. 對於警示名稱,輸入 myFirstDQAlarm,然後選擇下一步

  12. 您會看到所有選取項目的摘要頁面。選擇底部的建立警示

現在,您可以從 Amazon CloudWatch 警報儀表板看到正在創建的警報。

查詢資料品質結果以建置儀表板

您可能想要建置儀表板以顯示資料品質結果。有兩種方式可以進行:

EventBridge 使用以下代碼設置 Amazon 以將數據寫入 Amazon S3:

import boto3 import json from datetime import datetime s3_client = boto3.client('s3') glue_client = boto3.client('glue') s3_bucket = 's3-bucket-name' def write_logs(log_metadata): try: filename = datetime.now().strftime("%m%d%Y%H%M%S") + ".json" key_opts = { 'year': datetime.now().year, 'month': "{:02d}".format(datetime.now().month), 'day': "{:02d}".format(datetime.now().day), 'filename': filename } s3key = "gluedataqualitylogs/year={year}/month={month}/day={day}/{filename}".format(**key_opts) s3_client.put_object(Bucket=s3_bucket, Key=s3key, Body=json.dumps(log_metadata)) except Exception as e: print(f'Error writing logs to S3: {e}') def lambda_handler(event, context): log_metadata = {} message_text = "" subject_text = "" if event['detail']['context']['contextType'] == 'GLUE_DATA_CATALOG': log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['tableName'] = str(event['detail']['context']['tableName']) log_metadata['databaseName'] = str(event['detail']['context']['databaseName']) log_metadata['runId'] = str(event['detail']['context']['runId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_table_name: {}\n".format(log_metadata['tableName']) message_text += "glue_database_name: {}\n".format(log_metadata['databaseName']) message_text += "run_id: {}\n".format(log_metadata['runId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) else: log_metadata['ruleset_name'] = str(event['detail']['rulesetNames'][0]) log_metadata['jobName'] = str(event['detail']['context']['jobName']) log_metadata['jobId'] = str(event['detail']['context']['jobId']) log_metadata['resultId'] = str(event['detail']['resultId']) log_metadata['state'] = str(event['detail']['state']) log_metadata['score'] = str(event['detail']['score']) log_metadata['numRulesSucceeded'] = str(event['detail']['numRulesSucceeded']) log_metadata['numRulesFailed'] = str(event['detail']['numRulesFailed']) log_metadata['numRulesSkipped'] = str(event['detail']['numRulesSkipped']) message_text += "Glue Data Quality run details:\n" message_text += "ruleset_name: {}\n".format(log_metadata['ruleset_name']) message_text += "glue_job_name: {}\n".format(log_metadata['jobName']) message_text += "job_id: {}\n".format(log_metadata['jobId']) message_text += "result_id: {}\n".format(log_metadata['resultId']) message_text += "state: {}\n".format(log_metadata['state']) message_text += "score: {}\n".format(log_metadata['score']) message_text += "numRulesSucceeded: {}\n".format(log_metadata['numRulesSucceeded']) message_text += "numRulesFailed: {}\n".format(log_metadata['numRulesFailed']) message_text += "numRulesSkipped: {}\n".format(log_metadata['numRulesSkipped']) subject_text = "Glue Data Quality ruleset {} run details".format(log_metadata['ruleset_name']) resultID = str(event['detail']['resultId']) response = glue_client.get_data_quality_result(ResultId=resultID) RuleResults = response['RuleResults'] message_text += "\n\nruleset details evaluation steps results:\n\n" subresult_info = [] for dic in RuleResults: subresult = "Name: {}\t\tResult: {}\t\tDescription: \t{}".format(dic['Name'], dic['Result'], dic['Description']) if 'EvaluationMessage' in dic: subresult += "\t\tEvaluationMessage: {}".format(dic['EvaluationMessage']) subresult_info.append({ 'Name': dic['Name'], 'Result': dic['Result'], 'Description': dic['Description'], 'EvaluationMessage': dic.get('EvaluationMessage', '') }) message_text += "\n" + subresult log_metadata['resultrun'] = subresult_info write_logs(log_metadata) return { 'statusCode': 200, 'body': json.dumps('Message published to SNS topic') }

寫入 Amazon S3 之後,您可以使用 AWS Glue 爬蟲程式向 Athena 註冊並查詢資料表。

在資料品質評估期間設定 Amazon S3 位置:

在 Glue 資料型錄或 AWS AWS Glue ETL 中執行資料品質任務時,您可以提供 Amazon S3 位置,將資料品質結果寫入 Amazon S3。您可以使用以下語法,透過參考目標以讀取資料品質結果來建立資料表。

請注意,您必須分別執行 CREATE EXTERNAL TABLEMSCK REPAIR TABLE 查詢。

CREATE EXTERNAL TABLE <my_table_name>( catalogid string, databasename string, tablename string, dqrunid string, evaluationstartedon timestamp, evaluationcompletedon timestamp, rule string, outcome string, failurereason string, evaluatedmetrics string) PARTITIONED BY ( `year` string, `month` string, `day` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://glue-s3-dq-bucket-us-east-2-results/' TBLPROPERTIES ( 'classification'='json', 'compressionType'='none', 'typeOfData'='file');
MSCK REPAIR TABLE <my_table_name>;

建立上述資料表後,您便可以使用 Amazon Athena 執行分析查詢。

使用部署資料品質規則 AWS CloudFormation

您可以使用 AWS CloudFormation 來建立資料品質規則。如需詳細資訊,請參閱〈AWS CloudFormation AWS Glue〉。

排程資料品質規則

您可以使用下列方法排程資料品質規則:

  • 從「資料目錄」排程資料品質規則:沒有程式碼使用者可以使用此選項輕鬆排定其資料品質掃描的時間。 AWS Glue 數據質量將在 Amazon 創建時間表 EventBridge。若要排程資料品質規則:

    • 導覽至規則集,然後按一下執行

    • 執行頻率中,選取所需排程並提供任務名稱。此任務名稱是您在中的排程名稱 EventBridge。

  • 使用 Amazon EventBridge 和 AWS Step Functions 數來協調資料品質規則的評估和建議。