AWS Glue
開発者ガイド

Spark SQL ジョブの AWS Glue データカタログ サポート

AWS Glue データカタログ は Apache Hive メタストア互換カタログです。データカタログ を外部の Apache Hive メタストアとして使用するように AWS Glue ジョブと開発エンドポイントを設定できます。その後、データカタログ に格納されているテーブルに対して Apache Spark SQL クエリを直接実行できます。AWS Glue の動的フレームはデフォルトで データカタログ と統合されています。ただし、この機能を使用すると、Spark SQL ジョブは データカタログ を外部の Hive メタストアとして使用を開始できます。

ジョブ引数と開発エンドポイント引数にそれぞれ "--enable-glue-datacatalog": "" 引数を追加することで、AWS Glue ジョブと開発エンドポイントを設定できます。この引数を渡すと、Spark で外部 Hive メタストアとして データカタログ にアクセスできるようにする特定の設定が設定されます。また、AWS Glue ジョブまたは開発エンドポイントで作成された SparkSession オブジェクトで Hive サポートを有効にします

データカタログ アクセスを有効にするには、コンソールの [ジョブの追加] または [エンドポイントの追加] ページの [Catalog options (カタログオプション)] グループにある [Use Glue Data Catalog as the Hive metastore (Glue データカタログを Hive メタストアとして使用する)] チェックボックスをオンにします。ジョブまたは開発エンドポイントに使用される IAM ロールには glue:CreateDatabase アクセス許可が必要です。存在しない場合、「default」というデータベースが データカタログ に作成されます。

Spark SQL ジョブでこの機能の使用方法の例を見てみましょう。次の例では、s3://awsglue-datasets/examples/us-legislators にある米国国会議員のデータセットをクロールしたと想定しています。

Glue データカタログで定義されたテーブルからデータをシリアル化/逆シリアル化するには、Spark SQL は Spark ジョブのクラスパスにある Glue データカタログで定義された形式の Hive SerDe クラスが必要です。

特定の一般的なフォーマットの SerDes は AWS Glue によって配布されています。以下はこれらの Amazon S3 リンクです。

JSON SerDe を追加の JAR として開発エンドポイントに追加します。ジョブでは、引数フィールドで --extra-jars 引数を使用して SerDe を追加できます。詳細については、「AWS Glue で使用される特別なパラメータ」を参照してください。

これは、Spark SQL 用に データカタログ を有効にして開発エンドポイントを作成するための入力 JSON の例です。

{ "EndpointName": "Name", "RoleArn": "role_ARN", "PublicKey": "public_key_contents", "NumberOfNodes": 2, "Arguments": { "--enable-glue-datacatalog": "" }, "ExtraJarsS3Path": "s3://crawler-public/json/serde/json-serde.jar" }

Spark SQL を使用して、米国国会議員のデータセットから作成されたテーブルにクエリを実行します。

>>> spark.sql("use legislators") DataFrame[] >>> spark.sql("show tables").show() +-----------+------------------+-----------+ | database| tableName|isTemporary| +-----------+------------------+-----------+ |legislators| areas_json| false| |legislators| countries_json| false| |legislators| events_json| false| |legislators| memberships_json| false| |legislators|organizations_json| false| |legislators| persons_json| false| +-----------+------------------+-----------+ >>> spark.sql("describe memberships_json").show() +--------------------+---------+-----------------+ | col_name|data_type| comment| +--------------------+---------+-----------------+ | area_id| string|from deserializer| | on_behalf_of_id| string|from deserializer| | organization_id| string|from deserializer| | role| string|from deserializer| | person_id| string|from deserializer| |legislative_perio...| string|from deserializer| | start_date| string|from deserializer| | end_date| string|from deserializer| +--------------------+---------+-----------------+

形式の SerDe クラスがジョブのクラスパスで使用できない場合は、以下に示すようなエラーが表示されます。

>>> spark.sql("describe memberships_json").show() Caused by: MetaException(message:java.lang.ClassNotFoundException Class org.openx.data.jsonserde.JsonSerDe not found) at org.apache.hadoop.hive.metastore.MetaStoreUtils.getDeserializer(MetaStoreUtils.java:399) at org.apache.hadoop.hive.ql.metadata.Table.getDeserializerFromMetaStore(Table.java:276) ... 64 more

memberships テーブルとは異なる organization_id だけを表示するには、次の SQL クエリを実行します。

>>> spark.sql("select distinct organization_id from memberships_json").show() +--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

動的フレームで同じ手順を実行する必要がある場合は、以下を実行してください。

>>> memberships = glueContext.create_dynamic_frame.from_catalog(database="legislators", table_name="memberships_json") >>> memberships.toDF().createOrReplaceTempView("memberships") >>> spark.sql("select distinct organization_id from memberships").show() +--------------------+ | organization_id| +--------------------+ |d56acebe-8fdc-47b...| |8fa6c3d2-71dc-478...| +--------------------+

DynamicFrame は ETL オペレーション用に最適化されていますが、Spark SQL が データカタログ にアクセスできるようにすると、複雑な SQL ステートメントを実行したり、既存のアプリケーションを移植するための簡潔な方法が得られます。