本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
請注意,AWS Glue 尚未提供 Boto 3 資源 API。目前,只能使用 Boto 3 用戶端 API。
Python 中的 AWS Glue API 名稱
AWS Java 和其他程式設計語言的 Glue API 名稱通常為 CamelCased。但是,從 Python 呼叫時,這些一般名稱會變更為小寫字母,並以底線字元區隔名稱的部分,使其更為「Pythonic」。在 AWS Glue API 參考文件中,這些 Pythonic 名稱會列在一般 CamelCased 名稱後面的刮號中。
但是,雖然 AWS Glue API 名稱本身會轉換為小寫字母,但其參數名稱仍維持大寫字母。請務必記住這一點,因為如下所述,在呼叫 AWS Glue API 時,應以名稱傳遞參數。
在 AWS Glue 中傳遞和存取 Python 參數
在 Python 中呼叫 AWS Glue API,最好以名稱明確傳遞參數。例如:
job = glue.create_job(Name='sample', Role='Glue_DefaultRole',
Command={'Name': 'glueetl',
'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
這可協助您了解 Python 建立字典的名稱/值元組,讓您在 Job 結構 或 JobRun 結構 中將其指定為 ETL 指令碼的引數。Boto 3 會透過 REST API 呼叫,以 JSON 格式將它們傳送到 AWS Glue。這表示您在指令碼中存取這些引數時,無法倚賴引數的排序。
例如,假設您在 Python Lambda 處理常式函式中起始 JobRun
,而且要指定幾個參數。您的程式碼看起來類似如下:
from datetime import datetime, timedelta
client = boto3.client('glue')
def lambda_handler(event, context):
last_hour_date_time = datetime.now() - timedelta(hours = 1)
day_partition_value = last_hour_date_time.strftime("%Y-%m-%d")
hour_partition_value = last_hour_date_time.strftime("%-H")
response = client.start_job_run(
JobName = 'my_test_Job',
Arguments = {
'--day_partition_key': 'partition_0',
'--hour_partition_key': 'partition_1',
'--day_partition_value': day_partition_value,
'--hour_partition_value': hour_partition_value } )
若要在您的 ETL 指令碼中可靠地存取這些參數,請使用 AWS Glue 的 getResolvedOptions
指定其名稱,然後從產生的字典存取這些參數:
import sys
from awsglue.utils import getResolvedOptions
args = getResolvedOptions(sys.argv,
['JOB_NAME',
'day_partition_key',
'hour_partition_key',
'day_partition_value',
'hour_partition_value'])
print "The day partition key is: ", args['day_partition_key']
print "and the day partition value is: ", args['day_partition_value']
如果想傳遞一個巢狀 JSON 字串的引數,以在將參數值傳遞給 AWS Glue ETL 任務時保留參數值,則您必須在開始任務執行之前對參數字串進行編碼,然後在任務指令碼參考其之前對參數字串進行解碼。例如,請試想有下列引數字串:
glue_client.start_job_run(JobName = "gluejobname", Arguments={
"--my_curly_braces_string": '{"a": {"b": {"c": [{"d": {"e": 42}}]}}}'
})
若要正確傳遞此參數,您應該將引數編碼為 Base64 編碼的字串。
import base64
...
sample_string='{"a": {"b": {"c": [{"d": {"e": 42}}]}}}'
sample_string_bytes = sample_string.encode("ascii")
base64_bytes = base64.b64encode(sample_string_bytes)
base64_string = base64_bytes.decode("ascii")
...
glue_client.start_job_run(JobName = "gluejobname", Arguments={
"--my_curly_braces_string": base64_bytes})
...
sample_string_bytes = base64.b64decode(base64_bytes)
sample_string = sample_string_bytes.decode("ascii")
print(f"Decoded string: {sample_string}")
...
範例:建立和執行任務
以下範例顯示如何使用 Python 呼叫 AWS Glue API,以建立和執行 ETL 任務。
建立和執行任務
-
建立 AWS Glue 用戶端執行個體:
import boto3 glue = boto3.client(service_name='glue', region_name='us-east-1', endpoint_url='https://glue.us-east-1.amazonaws.com')
-
建立任務。您必須使用
glueetl
做為 ETL 命令的名稱,如以下程式碼所示:myJob = glue.create_job(Name='sample', Role='Glue_DefaultRole', Command={'Name': 'glueetl', 'ScriptLocation': 's3://my_script_bucket/scripts/my_etl_script.py'})
-
為您在上個步驟建立的任務啟動新的執行:
myNewJobRun = glue.start_job_run(JobName=myJob['Name'])
-
取得任務狀態:
status = glue.get_job_run(JobName=myJob['Name'], RunId=myNewJobRun['JobRunId'])
-
列印任務執行的目前狀態:
print(status['JobRun']['JobRunState'])