DynamoDB 内データのエクスポート、インポート、クエリを行う Hive コマンドの使用例 - Amazon EMR

DynamoDB 内データのエクスポート、インポート、クエリを行う Hive コマンドの使用例

以下の例では、Hive コマンドを使用して、データを Amazon S3 や HDFS にエクスポートする、データを DynamoDB にインポートする、テーブルを結合する、テーブルにクエリを行うなどの操作を行います。

Hive テーブルに対する操作では、DynamoDB に格納されているデータを参照します。Hive コマンドは、DynamoDB テーブルに設定されたスループット設定の制約を受けます。また、取得されるデータには、Hive 操作リクエストが DynamoDB で処理された時点で DynamoDB テーブルに書き込まれているデータが含まれます。データ取得プロセスに時間がかかる場合、Hive コマンドによって返されたデータには、Hive コマンドの開始後に DynamoDB で更新されたものが含まれる可能性があります。

Hive コマンドの DROP TABLECREATE TABLE は、Hive のローカルテーブルにのみ作用し、DynamoDB のテーブルの作成または削除は行いません。Hive クエリが DynamoDB のテーブルを参照する場合、そのテーブルは、クエリを実行する前に存在している必要があります。DynamoDB でのテーブルの作成と削除の詳細については、「Amazon DynamoDB デベロッパーガイド」の「DynamoDB のテーブルの操作」を参照してください。

注記

Hive テーブルを Amazon S3 内の場所にマッピングする場合、バケットのルートパス (s3://mybucket) にはマッピングしないでください。これを行うと、Hive がデータを Amazon S3 に書き込むときにエラーが発生することがあります。代わりに、テーブルをバケットのサブパス(s3://mybucket/mypath)にマッピングします。

DynamoDB からデータをエクスポートする

Hive を使用して DynamoDB のデータをエクスポートすることができます。

DynamoDB テーブルを Amazon S3 バケットにエクスポートするには
  • DynamoDB に格納されたデータを参照する Hive テーブルを作成します。次に、INSERT OVERWRITE コマンドを呼び出して、データを外部ディレクトリに書き込みます。次の例では、s3://bucketname/path/subpath/ は Amazon S3. 内の有効なパスです。CREATE コマンドで列とデータ型が DynamoDB 内の値と一致するように調整します。これを使用して DynamoDB データのアーカイブを Amazon S3 に作成できます。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE DIRECTORY 's3://bucketname/path/subpath/' SELECT * FROM hiveTableName;
書式設定を使用して DynamoDB テーブルを Amazon S3 バケットにエクスポートするには
  • Amazon S3 内の場所を参照する外部テーブルを作成します。これは次の例では s3_export です。CREATE の呼び出しで、テーブルの行の書式を指定します。次に、INSERT OVERWRITE を使用して DynamoDB のデータを s3_export にエクスポートすると、データは指定の書式で書き込まれます。次の例では、データはカンマ区切り値(CSV)形式で書き込まれます。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;
列のマッピングを指定せずに DynamoDB テーブルを Amazon S3 バケットにエクスポートするには
  • DynamoDB に格納されたデータを参照する Hive テーブルを作成します。これは、カラムマッピングを指定しないこと以外、前の例とほぼ同じです。このテーブルには map<string, string> 型の列が 1 つだけ含まれている必要があります。次に、Amazon S3 に EXTERNAL テーブルを作成したら、INSERT OVERWRITE コマンドを呼び出して DynamoDB のデータを Amazon S3 に書き込みます。これを使用して DynamoDB データのアーカイブを Amazon S3 に作成できます。カラムマッピングがないので、この方法でエクスポートされたテーブルをクエリすることはできません。カラムマッピングを指定しないデータのエクスポートは、Amazon EMR AMI 2.2.x 以降でサポートされる Hive 0.8.1.5 以降で利用できます。

    CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3TableName SELECT * FROM hiveTableName;
データ圧縮を使用して DynamoDB テーブルを Amazon S3 バケットにエクスポートするには
  • Hive では、Hive セッション中に設定できる圧縮コーデックが複数あります。これを行うことで、エクスポートデータは、指定した形式で圧縮されます。次の例では、LZO(Lempel-Ziv-Oberhumer)アルゴリズムを使用して、エクスポートされたファイルを圧縮します。

    SET hive.exec.compress.output=true; SET io.seqfile.compression.type=BLOCK; SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE lzo_compression_table (line STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE lzo_compression_table SELECT * FROM hiveTableName;

    以下の圧縮コーデックを利用できます。

    • org.apache.hadoop.io.compress.GzipCodec

    • org.apache.hadoop.io.compress.DefaultCodec

    • com.hadoop.compression.lzo.LzoCodec

    • com.hadoop.compression.lzo.LzopCodec

    • org.apache.hadoop.io.compress.BZip2Codec

    • org.apache.hadoop.io.compress.SnappyCodec

DynamoDB テーブルを HDFS にエクスポートするには
  • 次の Hive コマンドを使用します。hdfs:///directoryName は有効な HDFS パスであり、hiveTableName は Hive のテーブルで、DynamoDB を参照しています。Hive 0.7.1.1 はデータを Amazon S3 にエクスポートする際に HDFS を中間ステップとして使用するので、このエクスポート操作は、DynamoDB テーブルを Amazon S3 にエクスポートするよりも高速です。また、次の例は、dynamodb.throughput.read.percent を 1.0 に設定して読み取りリクエストレートを上げる方法を示しています。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); SET dynamodb.throughput.read.percent=1.0; INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;

    Amazon S3 へのエクスポートで説明したように、書式設定や圧縮を使用して HDFS にデータをエクスポートすることもできます。これを行うには、上記の例の Amazon S3 ディレクトリを HDFS ディレクトリに置き換えるだけです。

印刷不可の UTF-8 文字データを Hive で読み取るには
  • テーブル作成時に STORED AS SEQUENCEFILE 句を使用すると、印刷不可の UTF-8 文字データを Hive で読み取ることができます。SequenceFile は Hadoop バイナリファイル形式です。このファイルを読み取るには Hadoop を使用する必要があります。次の例に、DynamoDB から Amazon S3 にデータをエクスポートする方法を示します。この機能を使用して印刷不可の UTF-8 でエンコードされた文字を処理できます。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) STORED AS SEQUENCEFILE LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;

DynamoDB へのデータのアップロード

Hive を使用してデータを DynamoDB に書き込む場合は、書き込み容量単位の数をクラスター内のマッパーの数より大きいことを確認する必要があります。たとえば、m1.xlarge EC2 インスタンスで実行されるクラスターは、インスタンスごとに 8 個のマッパーを作成します。10 個のインスタンスがあるクラスターの場合は、合計 80 個のマッパーがあることになります。書き込み容量単位がクラスター内のマッパーの数以下である場合、Hive の書き込み操作は書き込みスループットを消費しきるか、プロビジョニングされた以上のスループットを消費しようとします。各 EC2 インスタンスタイプによって生成されるマッパーの数の詳細については、Hadoop の設定 を参照してください。

Hadoop 内のマッパーの数は入力分割数によって決まります。分割数が少なすぎる場合、書き込みコマンドは書き込みスループットを消費しきれない可能性があります。

同じキーを持つ項目がターゲット DynamoDB テーブルに存在する場合、項目は上書きされます。キーを持つ項目がターゲット DynamoDB テーブルに存在しない場合、その項目は挿入されます。

Amazon S3 のテーブルを DynamoDB にインポートするには
  • Amazon EMR (Amazon EMR) と Hive を使用して、Amazon S3 から DynamoDB にデータを書き込むことができます。

    CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
カラムマッピングを指定せずに Amazon S3 バケットのテーブルを DynamoDB にインポートするには
  • 以前 DynamoDB からエクスポートされ Amazon S3 に格納されたデータを参照する EXTERNAL テーブルを作成します。インポートする前に、テーブルが DynamoDB に存在することと、そのキースキーマが、以前エクスポートされた DynamoDB テーブルと同じであることを確認します。また、テーブルには map<string, string> 方の列が 1 つだけ含まれる必要があります。次に、DynamoDB にリンクされている Hive テーブルを作成したら、INSERT OVERWRITE コマンドを呼び出して Amazon S3 のデータを DynamoDB に書き込みます。カラムマッピングがないので、この方法でインポートされたテーブルにクエリを行うことはできません。カラムマッピングを指定しないデータのインポートは、Amazon EMR AMI 2.2.3 以降でサポートされる Hive 0.8.1.5 以降で利用できます。

    CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3TableName;
HDFS のテーブルを DynamoDB にインポートするには
  • Amazon EMR と Hive を使用して HDFS のデータを DynamoDB にインポートすることができます。

    CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'hdfs:///directoryName'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;

DynamoDB 内データのクエリ

次の例は、Amazon EMR を使用して DynamoDB のデータにクエリを行うさまざまな方法を示します。

マッピングされたカラムで最大値を検索するには(max
  • 次のような Hive コマンドを使用します。最初のコマンドの CREATE ステートメントで、DynamoDB に格納されたデータを参照する Hive テーブルを作成します。次に、SELECT ステートメントで、そのテーブルを使用して、DynamoDB に格納されたデータのクエリを行います。次の例では、指定された顧客による最大の注文を検索します。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT max(total_cost) from hive_purchases where customerId = 717;
GROUP BY 句を使用してデータを集計するには
  • GROUP BY 句を使用して、複数のレコードのデータを収集できます。多くの場合、これは sum、count、min、または max のような集計関数とともに使用されます。次の例は、4 件以上注文した顧客の注文のうち最大の注文のリストを返します。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
2 つの DynamoDB テーブルを結合するには
  • 次の例では、2 つの Hive テーブルを DynamoDB に格納されているデータにマッピングします。その後、2 つのテーブルに対して join を呼び出します。結合はクラスターで計算され、以下を返します。結合は DynamoDB 内では発生しません。この例は、3 件以上注文した顧客について、顧客とその購入品のリストを返します。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Customers", "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address"); Select c.customerId, c.customerName, count(*) as count from hive_customers c JOIN hive_purchases p ON c.customerId=p.customerId GROUP BY c.customerId, c.customerName HAVING count > 2;
異なるソースの 2 つのテーブルを結合するには
  • 次の例の Customer_S3 は、Amazon S3 に格納された CSV ファイルを読み込む Hive テーブルであり、hive_purchases は、DynamoDB のデータを参照するテーブルです。次の例では、Amazon S3 に CSV ファイルとして格納されている顧客データと、DynamoDB に格納されている注文データを結合し、名前に「Miller」が含まれる顧客による注文を表すデータのセットを返します。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; Select c.customerId, c.customerName, c.customerAddress from Customer_S3 c JOIN hive_purchases p ON c.customerid=p.customerid where c.customerName like '%Miller%';
注記

前の各例には、明確かつ完全にするために CREATE TABLE 句が含まれていました。指定された Hive テーブルに対して複数のクエリやエクスポート操作を実行する場合は、Hive セッションの開始時にテーブルを一度作成するだけで済みます。