データを Apache Parquet に変換するための 3 つの AWS Glue ETL ジョブタイプ - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

データを Apache Parquet に変換するための 3 つの AWS Glue ETL ジョブタイプ

作成者:Adnan Alvee (AWS), Karthikeyan Ramachandran, and Nith Govindasivan (AWS)

環境:PoC またはパイロット

テクノロジー:分析

ワークロード:その他すべてのワークロード

AWS サービス: AWS Glue

[概要]

Amazon Web Services (AWS) クラウド上で、AWS Glue は完全に管理された抽出、変換、ロード (ETL) サービスです。AWS Glue は、データを分類し、クリーニングし、リッチ化し、様々なデータストアやデータストリーム間で確実に移動させるための費用対効果を実現します。

このパターンでは、AWS Glue でさまざまなジョブタイプが提供され、3 つの異なるスクリプトを使用して ETL ジョブの作成を示しています。

AWS Glue を使用して Python シェル環境で ETL ジョブを記述できます。マネージド Apache Spark 環境で Python (PySpark) または Scala を使用して、バッチ ETL ジョブとストリーミング ETL ジョブの両方を作成することもできます。ETL ジョブの作成を始めるにあたって、このパターンは Python シェル、、および Scala を使用するバッチ ETL ジョブに焦点を当てています。 PySparkPython シェルジョブは、より少ない計算能力を必要とするワークロードを対象としています。マネージド Apache Spark 環境は、高い計算能力を必要とするワークロードを対象としています。

Apache Parquet は、効率的な圧縮とエンコードスキームをサポートするように構築されています。データを列指向に保存するため、分析ワークロードを高速化できます。データを Parquet に変換すると、長期的にはストレージ容量、コスト、時間を節約できます。Parquet について詳しくは、ブログ記事「Apache Parquet:オープンソースの列指向データ形式でヒーローになる方法」をご覧ください。

前提条件と制限

前提条件

  • AWS Identity and Access Management (IAM) ロール (ロールがない場合は、「追加情報」セクションを参照してください)

アーキテクチャ

ターゲットテクノロジースタック

  • AWS Glue

  • Amazon Simple Storage Service (Amazon S3)

  • Apache Parquet

自動化とスケール

  • AWS Glue ワークフロー」は ETL パイプラインの完全自動化をサポートします。

  • データ処理ユニット (DPU) の数またはワーカーのタイプを変更して、水平方向と垂直方向にスケールさせることができます。

ツール

AWS サービス

  • Amazon Simple Storage Service (Amazon S3) は、量にかかわらず、データを保存、保護、取得するのに役立つクラウドベースのオブジェクトストレージサービスです。

  • AWS Glue」は、さまざまなデータストアやデータストリーム間でデータを分類、クリーニング、強化、移動するための完全マネージド型の ETL サービスです。

その他のツール

  • Apache Parquet」は、ストレージとデータの取得を目的として設計されたオープンソースの列指向データファイル形式です。

設定

AWS Glue ETL の計算能力を設定するには、以下の設定を使用してください。コストを削減するには、このパターンで提供されるワークロードを実行するときには最小限の設定を使用してください。 

  • Python シェル — 1 DPU を使用して 16 GB のメモリを利用するか、0.0625 DPU を使用して 1 GB のメモリを利用できます。このパターンでは、AWS Glue コンソールのデフォルトである 0.0625 DPU を使用します。

  • Python または Scala for Spark — コンソールで Spark 関連のジョブタイプを選択した場合、AWS Glue はデフォルトで 10 個のワーカーと G.1X ワーカータイプを使用します。このパターンでは、2 つのワーカー (許容される最小数) と標準のワーカータイプを使用するため、十分かつ費用対効果が高くなります。

次の表は、Apache Spark 環境のさまざまな AWS Glue ワーカータイプを示しています。Python シェルジョブは Apache Spark 環境を使用して Python を実行しないため、このテーブルには含まれていません。

規格

G.1X

G.2X

vCPU

4

4

8

「メモリ」

16 GB

16 GB

32 GB

ディスク容量

50 GB

64 GB

128 GB

ワーカーごとのエグゼキューター

2

1

Code

IAM ロールやパラメータ設定など、このパターンで使用されるコードについては、「追加情報」セクションを参照してください。

エピック

タスク説明必要なスキル

データを新規または既存の S3 バケットにアップロードします。

お使いののアカウント内で新しい S3 バケットを作成するか、既存の S3 バケットを使用します。「添付ファイル」セクションから sample_data.csv ファイルをアップロードし、S3 バケットとプレフィックスの場所を書き留めます。

AWS 全般
タスク説明必要なスキル

AWS Glue ジョブを作成します。

AWS Glue コンソールの ETL セクションで、AWS Glue ジョブを追加します。適切なジョブタイプ、AWS Glue バージョン、対応する DPU/ワーカータイプとワーカー数を選択します。詳細については、「設定」セクションを参照してください。

デベロッパー、クラウド、またはデータ

入力位置と出力位置を変更します。

AWS Glue ジョブに対応するコードをコピーし、「データのアップロード」エピックでメモした入力場所と出力場所を変更します。

デベロッパー、クラウド、またはデータ

パラメータを設定します。

「追加情報」セクションに記載されているスニペットを使用して、ETL ジョブのパラメータを設定できます。AWS Glue は内部で次の 4 つの引数名を使用します。

  • --conf

  • --debug

  • --mode

  • --JOB_NAME

--JOB_NAME パラメータは、AWS Glue コンソールで明示的に入力する必要があります。[ジョブ]、[ジョブの編集]、[セキュリティ設定]、[スクリプトライブラリ]、および [ジョブパラメータ] (オプション) を選択します。--JOB_NAME キーとして入力し、値を提供します。また、AWS コマンドラインインターフェイス (AWS CLI) または AWS Glue API でもこのパラメータを設定できます。--JOB_NAME パラメーターは Spark によって使用され、Python シェル環境のジョブでは必要ありません。

-- をすべてのパラメーター名の前に追加する必要があります。そうしないと、コードは機能しません。たとえば、コードスニペットの場合、ロケーションパラメータは --input_loc--output_loc によって呼び出される必要があります。

デベロッパー、クラウド、またはデータ

ETL ジョブを実行する。

ジョブを実行し、出力を確認します。元のファイルからどれだけの容量が削減されたかに注意してください。

デベロッパー、クラウド、またはデータ

関連リソース

リファレンス

チュートリアルと動画

追加情報

IAM ロール

AWS Glue ジョブを作成するときは、次のコードスニペットに示されている権限を持つ既存の IAM ロールまたは新しいロールを使用できます。

新規ロールを作成するには、次の YAML コードを使用します。

# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]

AWS Glue Python シェル

Python コードでは、Pandas PyArrow とライブラリを使用してデータを Parquet に変換します。Pandas ライブラリは既に使用可能です。 PyArrow このライブラリは 1 回限りの実行なので、パターンを実行するとダウンロードされます。 PyArrow ホイールファイルを使用してライブラリに変換し、そのファイルをライブラリパッケージとして提供できます。wheel ファイルのパッケージングについての詳細は、「独自の Python ライブラリの提供」を参照してください。

AWS Glue Python シェルパラメータ

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])

AWS Glue Python シェルコード

from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1]  s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False)  s3_resource.Object(output_loc_bucket, output_loc_prefix +  'data' + '.parquet').put(Body=parquet_buffer.getvalue())

Python を使った AWS Glue ースパークジョブ

Python で AWS Glue Spark ジョブタイプを使用するには、ジョブタイプとして [Spark] を選択します。AWS Glue バージョンとして、ジョブの起動時間が改善された Spark 3.1、Python 3 (グルーバージョン 3.0) を選択してください。

AWS Glue Python パラメータ

from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])

Python コードを使用した AWS Glue ースパークジョブ

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "bucket-name/prefix/sample_data.csv" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\     connection_type = "s3", \     connection_options = {          "paths": [input_loc]}, \     format = "csv",     format_options={         "withHeader": True,         "separator": ","     }) outputDF = glueContext.write_dynamic_frame.from_options(\     frame = inputDyf, \     connection_type = "s3", \     connection_options = {"path": output_loc \         }, format = "parquet")    

圧縮されたサイズの大きいファイルが多数ある場合 (たとえば、1,000 個のファイルがそれぞれ約 3 MB)、次のコードに示すように、recurse パラメータと compressionType パラメータを組み合わせてプレフィックス内にあるすべてのファイルを読み取ります。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

圧縮された小さなファイルが多数ある場合 (たとえば、それぞれが約 133 KB の 1,000 ファイル)、compressionType パラメーターと recurse パラメーターとともに、groupFiles パラメーターを使用してください。groupFiles パラメータは小さなファイルを複数の大きなファイルにグループ化し、groupSize パラメータは指定されたサイズ (たとえば 1 MB) にグループ化を制御します。次のコードスニペットは、コード内でこれらのパラメーターを使用する例を示しています。

input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )

ワーカーノードを変更しなくても、これらの設定により、AWS Glue ジョブは複数のファイル (大小を問わず、圧縮の有無にかかわらず) を読み取り、Parquet 形式でターゲットに書き込むことができます。

Scala を使った AWS Glue ースパークジョブ

Scala で AWS Glue Spark ジョブタイプを使用するには、ジョブタイプとして [Spark] を選択し、言語として Scala を選択します。AWS Glue バージョンとして、ジョブの起動時間が改善された Spark 3.1、Scala 2 (Glue バージョン 3.0) を選択してください。ストレージ容量を節約するために、次の AWS Glue with Scala サンプルでも applyMapping 機能を使用してデータ型を変換しています。

AWS Glue Scala パラメータ

import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)

Scala コードを使用した AWS Glue ースパークジョブ

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp {   def main(sysArgs: Array[String]) {          @transient val spark: SparkContext = SparkContext.getOrCreate()     val glueContext: GlueContext = new GlueContext(spark)     val inputLoc = "s3://bucket-name/prefix/sample_data.csv"     val outputLoc = "s3://bucket-name/prefix/"     val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame()     val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"),     ("_c2", "string", "profit", "double")), caseSensitive = false)     val formatPartition = applyMapping.toDF().coalesce(1)     val dynamicFrame = DynamicFrame(formatPartition, glueContext)     val dataSink = glueContext.getSinkWithFormat(         connectionType = "s3",          options = JsonOptions(Map("path" -> outputLoc )),         transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame)   } }

添付ファイル

このドキュメントに関連する追加コンテンツにアクセスするには、次のファイルを解凍してください。「attachment.zip