Menggunakan API untuk mengukur dan mengelola kualitas data - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan API untuk mengukur dan mengelola kualitas data

Topik ini menjelaskan cara menggunakan API untuk mengukur dan mengelola kualitas data.

Prasyarat

  • Pastikan versi boto3 Anda mutakhir sehingga menyertakan API Kualitas Data AWS Glue terbaru.

  • Pastikan versi AWS CLI Anda mutakhir, sehingga menyertakan CLI terbaru.

Jika Anda menggunakan tugas AWS Glue untuk menjalankan API ini, Anda dapat menggunakan opsi berikut untuk memperbarui pustaka boto3 ke versi terbaru:

—additional-python-modules boto3==<version>

Bekerja dengan rekomendasi Kualitas Data AWS Glue

Untuk memulai rekomendasi AWS Glue Data Quality, jalankan:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 Glue client. """ self.glue_client = glue_client def start_data_quality_rule_recommendation_run(self, database_name, table_name, role_arn): """ Starts a recommendation run that is used to generate rules when you don't know what rules to write. AWS Glue Data Quality analyzes the data and comes up with recommendations for a potential ruleset. You can then triage the ruleset and modify the generated ruleset to your liking. :param database_name: The name of the AWS Glue database which contains the dataset. :param table_name: The name of the AWS Glue table against which we want a recommendation :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. """ try: response = self.client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_arn ) except ClientError as err: logger.error( "Couldn't start data quality recommendation run %s. Here's why: %s: %s", name, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

Untuk menjalankan rekomendasi, Anda dapat menggunakan pushDownPredicates atau catalogPartitionPredicates untuk meningkatkan kinerja dan menjalankan rekomendasi hanya pada partisi tertentu dari sumber katalog Anda.

client.start_data_quality_rule_recommendation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name, 'AdditionalOptions': { 'pushDownPredicate': "year=2022" } } }, Role=role_arn, NumberOfWorkers=2, CreatedRulesetName='<rule_set_name>' )

Untuk mendapatkan hasil rekomendasi AWS Glue Data Quality, jalankan:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_rule_recommendation_run(self, run_id): """ Gets the specified recommendation run that was used to generate rules. :param run_id: The id of the data quality recommendation run """ try: response = self.client.get_data_quality_rule_recommendation_run(RunId=run_id) except ClientError as err: logger.error( "Couldn't get data quality recommendation run %. Here's why: %s: %s", run_id, err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Dari objek respon di atas, Anda dapat mengekstrak RuleSet yang direkomendasikan oleh run, untuk digunakan dalam langkah selanjutnya:

print(response['RecommendedRuleset']) Rules = [ RowCount between 2000 and 8000, IsComplete "col1", IsComplete "col2", StandardDeviation "col3" between 58138330.8 and 64258155.09, ColumnValues "col4" between 1000042965 and 1214474826, IsComplete "col5" ]

Untuk mendapatkan daftar semua rekomendasi Anda berjalan yang dapat difilter dan dicantumkan:

response = client.list_data_quality_rule_recommendation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>' } } )

Untuk membatalkan tugas rekomendasi Kualitas Data AWS Glue yang ada:

response = client.cancel_data_quality_rule_recommendation_run( RunId='dqrun-d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' )

Bekerja dengan Aturan Kualitas Data AWS Glue

Untuk membuat aturan Kualitas Data AWS Glue:

response = client.create_data_quality_ruleset( Name='<ruleset_name>', Ruleset='Rules = [IsComplete "col1", IsPrimaryKey "col2", RowCount between 2000 and 8000]', TargetTable={ 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } )

Untuk mendapatkan aturan kualitas data:

response = client.get_data_quality_ruleset( Name='<ruleset_name>' ) print(response)

Anda dapat menggunakan API ini untuk kemudian mengekstrak set aturan:

print(response['Ruleset'])

Untuk membuat daftar semua aturan kualitas data untuk tabel:

response = client.list_data_quality_rulesets()

Anda dapat menggunakan kondisi filter dalam API untuk memfilter semua aturan yang dilampirkan ke database atau tabel tertentu:

response = client.list_data_quality_rulesets( Filter={ 'TargetTable': { 'TableName': '<table_name>', 'DatabaseName': '<database_name>' } }, )

Untuk memperbarui kumpulan aturan kualitas data:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def update_data_quality_ruleset(self, ruleset_name, ruleset_string): """ Update an AWS Glue Data Quality Ruleset :param ruleset_name: The name of the AWS Glue Data Quality ruleset to update :param ruleset_string: The DQDL ruleset string to update the ruleset with """ try: response = self.client.update_data_quality_ruleset( Name=ruleset_name, Ruleset=ruleset_string ) except ClientError as err: logger.error( "Couldn't update the AWS Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Untuk menghapus kumpulan aturan kualitas data:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def delete_data_quality_ruleset(self, ruleset_name): """ Delete a AWS Glue Data Quality Ruleset :param ruleset_name: The name of the AWS Glue Data Quality ruleset to delete """ try: response = self.client.delete_data_quality_ruleset( Name=ruleset_name ) except ClientError as err: logger.error( "Couldn't delete the AWS Glue Data Quality ruleset. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Bekerja dengan AWS Glue Data Quality berjalan

Untuk memulai menjalankan AWS Glue Data Quality:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def start_data_quality_ruleset_evaluation_run(self, database_name, table_name, role_name, ruleset_list): """ Start an AWS Glue Data Quality evaluation run :param database_name: The name of the AWS Glue database which contains the dataset. :param table_name: The name of the AWS Glue table against which we want to evaluate. :param role_arn: The Amazon Resource Name (ARN) of an AWS Identity and Access Management (IAM) role that grants permission to let AWS Glue access the resources it needs. :param ruleset_list: The list of AWS Glue Data Quality ruleset names to evaluate. """ try: response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } }, Role=role_name, RulesetNames=ruleset_list ) except ClientError as err: logger.error( "Couldn't start the AWS Glue Data Quality Run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response['RunId']

Ingat bahwa Anda dapat meneruskan catalogPartitionPredicate parameter pushDownPredicate atau untuk memastikan kualitas data Anda berjalan hanya menargetkan sekumpulan partisi tertentu dalam tabel katalog Anda. Misalnya:

response = client.start_data_quality_ruleset_evaluation_run( DataSource={ 'GlueTable': { 'DatabaseName': '<database_name>', 'TableName': '<table_name>', 'AdditionalOptions': { 'pushDownPredicate': 'year=2023' } } }, Role='<role_name>', NumberOfWorkers=5, Timeout=123, AdditionalRunOptions={ 'CloudWatchMetricsEnabled': False }, RulesetNames=[ '<ruleset_name>', ] )

Untuk mendapatkan informasi tentang AWS Glue Data Quality run:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_ruleset_evaluation_run(self, run_id): """ Get details about an AWS Glue Data Quality Run :param run_id: The AWS Glue Data Quality run ID to look up """ try: response = self.client.get_data_quality_ruleset_evaluation_run( RunId=run_id ) except ClientError as err: logger.error( "Couldn't look up the AWS Glue Data Quality run ID. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Untuk mendapatkan hasil dari AWS Glue Data Quality menjalankan:

Untuk menjalankan Kualitas Data AWS Glue tertentu, Anda dapat mengekstrak hasil evaluasi run menggunakan metode berikut:

response = client.get_data_quality_ruleset_evaluation_run( RunId='d4b6b01957fdd79e59866365bf9cb0e40fxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(response['RuleResults'])

Untuk mencantumkan semua AWS Glue Data Quality Anda berjalan:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def list_data_quality_ruleset_evaluation_runs(self, database_name, table_name): """ Lists all the AWS Glue Data Quality runs against a given table :param database_name: The name of the database where the data quality runs :param table_name: The name of the table against which the data quality runs were created """ try: response = self.client.list_data_quality_ruleset_evaluation_runs( Filter={ 'DataSource': { 'GlueTable': { 'DatabaseName': database_name, 'TableName': table_name } } } ) except ClientError as err: logger.error( "Couldn't list the AWS Glue Quality runs. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Anda dapat memodifikasi klausa filter untuk hanya menampilkan hasil antara waktu tertentu atau berjalan terhadap tabel tertentu.

Untuk menghentikan proses AWS Glue Data Quality yang sedang berlangsung:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def cancel_data_quality_ruleset_evaluation_run(self, result_id): """ Cancels a given AWS Glue Data Quality run :param result_id: The result id of a AWS Glue Data Quality run to cancel """ try: response = self.client.cancel_data_quality_ruleset_evaluation_run( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't cancel the AWS Glue Data Quality run. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Bekerja dengan hasil Kualitas Data AWS Glue

Untuk mendapatkan hasil AWS Glue Data Quality run Anda:

class GlueWrapper: """Encapsulates AWS Glue actions.""" def __init__(self, glue_client): """ :param glue_client: A Boto3 AWS Glue client. """ self.glue_client = glue_client def get_data_quality_result(self, result_id): """ Outputs the result of an AWS Glue Data Quality Result :param result_id: The result id of an AWS Glue Data Quality run """ try: response = self.client.get_data_quality_result( ResultId=result_id ) except ClientError as err: logger.error( "Couldn't get the AWS Glue Data Quality result. Here's why: %s: %s", err.response['Error']['Code'], err.response['Error']['Message']) raise else: return response

Untuk membatalkan tugas rekomendasi Kualitas Data AWS Glue yang ada:

Diberikan AWS Glue Data Quality run ID, Anda dapat mengekstrak ID hasil untuk kemudian mendapatkan hasil aktual, seperti yang ditunjukkan di bawah ini:

response = client.get_data_quality_ruleset_evaluation_run( RunId='dqrun-abca77ee126abe1378c1da1ae0750xxxxxxxx' ) resultID = response['ResultIds'][0] response = client.get_data_quality_result( ResultId=resultID ) print(resp['RuleResults'])