カスタムビジュアルスクリプトの例 - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

カスタムビジュアルスクリプトの例

以下の例で行われる変換はどれも等価です。ただし、2 番目の例 (SparkSQL) が最もクリーンで効率的です。その次が Pandas UDF で、最後は最初の例の低レベルのマッピングです。次の例は、2 つの列を合計する単純な変換の完全な例です。

from awsglue import DynamicFrame # You can have other auxiliary variables, functions or classes on this file, it won't affect the runtime def record_sum(rec, col1, col2, resultCol): rec[resultCol] = rec[col1] + rec[col2] return rec # The number and name of arguments must match the definition on json config file # (expect self which is the current DynamicFrame to transform # If an argument is optional, you need to define a default value here # (resultCol in this example is an optional argument) def custom_add_columns(self, col1, col2, resultCol="result"): # The mapping will alter the columns order, which could be important fields = [field.name for field in self.schema()] if resultCol not in fields: # If it's a new column put it at the end fields.append(resultCol) return self.map(lambda record: record_sum(record, col1, col2, resultCol)).select_fields(paths=fields) # The name we assign on DynamicFrame must match the configured "functionName" DynamicFrame.custom_add_columns = custom_add_columns

次の例は、SparkSQL API を利用した同等の変換です。

from awsglue import DynamicFrame # The number and name of arguments must match the definition on json config file # (expect self which is the current DynamicFrame to transform # If an argument is optional, you need to define a default value here # (resultCol in this example is an optional argument) def custom_add_columns(self, col1, col2, resultCol="result"): df = self.toDF() return DynamicFrame.fromDF( df.withColumn(resultCol, df[col1] + df[col2]) # This is the conversion logic , self.glue_ctx, self.name) # The name we assign on DynamicFrame must match the configured "functionName" DynamicFrame.custom_add_columns = custom_add_columns

次の例では同じ変換を使用していますが、Pandas UDF を使用しているため、プレーン UDF を使用するよりも効率的です。Pandas UDF を書く方法の詳細については、Apache Spark SQL のドキュメントを参照してください。

from awsglue import DynamicFrame import pandas as pd from pyspark.sql.functions import pandas_udf # The number and name of arguments must match the definition on json config file # (expect self which is the current DynamicFrame to transform # If an argument is optional, you need to define a default value here # (resultCol in this example is an optional argument) def custom_add_columns(self, col1, col2, resultCol="result"): @pandas_udf("integer") # We need to declare the type of the result column def add_columns(value1: pd.Series, value2: pd.Series) → pd.Series: return value1 + value2 df = self.toDF() return DynamicFrame.fromDF( df.withColumn(resultCol, add_columns(col1, col2)) # This is the conversion logic , self.glue_ctx, self.name) # The name we assign on DynamicFrame must match the configured "functionName" DynamicFrame.custom_add_columns = custom_add_columns