用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例 - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

用于在 DynamoDB 中导出、导入和查询数据的 Hive 命令示例

以下示例使用 Hive 命令执行诸如将数据导出到 Amazon S3 或HDFS将数据导入到 DynamoDB、联接表、查询表等操作。

对 Hive 表执行的操作将引用 DynamoDB 中存储的数据。Hive 命令受到 DynamoDB 表预置的吞吐量设置约束,并且检索的数据包括 DynamoDB 处理 Hive 操作请求时写入到 DynamoDB 表的数据。如果数据检索过程需要很长一段时间,则自 Hive 命令开始执行以来,Hive 命令返回的某些数据可能已在 DynamoDB 中更新。

Hive 命令 DROP TABLECREATE TABLE 仅对 Hive 中的本地表进行操作,而不会在 DynamoDB 中创建或删除表。如果 Hive 查询引用 DynamoDB 中的表,则在您运行查询之前,该表必须已存在。有关在 DynamoDB 中创建和删除表的更多信息,请参阅 Amazon DynamoDB 开发人员指南中的在 DynamoDB 中处理表

注意

当您将 Hive 表映射到 Amazon S3 中的位置时,请勿将其映射到存储桶的根路径 s3://mybucket,否则这会在 Hive 将数据写入到 Amazon S3 时导致错误。而是应将表映射到存储桶的子路径 s3://mybucket/mypath。

从 DynamoDB 中导出数据

可以使用 Hive 从 DynamoDB 中导出数据。

将 DynamoDB 表导出到 Amazon S3 存储桶
  • 创建一个引用 DynamoDB 中存储的数据的 Hive 表。然后,您可以调用INSERTOVERWRITE命令将数据写入外部目录。在以下示例中,s3://bucketname/path/subpath/ 是 Amazon S3 中的有效路径。调整CREATE命令中的列和数据类型,使其与 DynamoDB 中的值相匹配。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE DIRECTORY 's3://bucketname/path/subpath/' SELECT * FROM hiveTableName;
使用格式设置将 DynamoDB 表导出到 Amazon S3 存储桶
  • 创建引用 Amazon S3 中的位置的外部表。此表在下面显示为 s3_export。在CREATE调用期间,为表指定行格式。然后,当您使用INSERTOVERWRITE将数据从 DynamoDB 导出到 s3_export 时,数据将以指定的格式写出。在以下示例中,数据以逗号分隔值 () CSV 的形式写出。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;
在不指定列映射的情况下将 DynamoDB 表导出到 Amazon S3 存储桶
  • 创建一个引用 DynamoDB 中存储的数据的 Hive 表。此例与前面的示例类似,只是不指定列映射。该表必须正好具有类型为 map<string, string> 的一个列。如果您随后在 Amazon S3 中创建 EXTERNAL 表,可以调用 INSERT OVERWRITE 命令将数据从 DynamoDB 写入到 Amazon S3。可以使用此命令在 Amazon S3 中创建 DynamoDB 数据的存档。由于没有列映射,因此您无法查询以此方式导出的表。Hive 0.8.1.5 或更高版本支持在不指定列映射的情况下导出数据,Amazon 2.2 支持该版本。EMR AMI x 及更高版本。

    CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3TableName SELECT * FROM hiveTableName;
使用数据压缩将 DynamoDB 表导出到 Amazon S3 存储桶
  • Hive 提供多个可以在 Hive 会话期间设置的压缩编解码器。这样做会导致导出的数据以指定的格式进行压缩。以下示例使用 Lempel-Ziv-Oberhumer () 算法压缩导出的文件。LZO

    SET hive.exec.compress.output=true; SET io.seqfile.compression.type=BLOCK; SET mapred.output.compression.codec = com.hadoop.compression.lzo.LzopCodec; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE lzo_compression_table (line STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE lzo_compression_table SELECT * FROM hiveTableName;

    可用的压缩编解码器包括:

    • org.apache.hadoop.io.compress。 GzipCodec

    • org.apache.hadoop.io.compress。 DefaultCodec

    • com.hadoop.compression.lzo。 LzoCodec

    • com.hadoop.compression.lzo。 LzopCodec

    • org.apache.hadoop.io.compress。 BZip2Codec

    • org.apache.hadoop.io.compress。 SnappyCodec

将 DynamoDB 表导出到 HDFS
  • 使用以下 Hive 命令,其中 hdfs:///directoryName 是有效的HDFS路径并且 hiveTableName 是 Hive 中引用 DynamoDB 的表。此导出操作比将 DynamoDB 表导出到 Amazon S3 要快,因为 Hive 0.7.1.1 在将数据导出到亚马逊 S3 时HDFS用作中间步骤。以下示例还显示了如何将 dynamodb.throughput.read.percent 设置为 1.0 以提高读取请求速率。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); SET dynamodb.throughput.read.percent=1.0; INSERT OVERWRITE DIRECTORY 'hdfs:///directoryName' SELECT * FROM hiveTableName;

    您也可以HDFS使用格式化和压缩将数据导出到,如上所示,以便导出到 Amazon S3。为此,只需将上面示例中的 Amazon S3 目录替换为一个HDFS目录即可。

在 Hive 中读取不可打印的 UTF -8 个字符数据
  • 通过在创建表时使用STORED AS SEQUENCEFILE子句,可以使用 Hive 读取和写入不可打印的 UTF -8 个字符的数据。A SequenceFile 是 Hadoop 二进制文件格式;你需要使用 Hadoop 来读取这个文件。以下示例显示了如何将数据从 DynamoDB 导出到 Amazon S3 中。您可以使用此功能来处理不可打印的 UTF -8 编码字符。

    CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); CREATE EXTERNAL TABLE s3_export(a_col string, b_col bigint, c_col array<string>) STORED AS SEQUENCEFILE LOCATION 's3://bucketname/path/subpath/'; INSERT OVERWRITE TABLE s3_export SELECT * FROM hiveTableName;

将数据导入到 DynamoDB

使用 Hive 将数据写入到 DynamoDB 中时,应确保写入容量单位数大于集群中的映射器数。例如,在 m1.xlarge 实例上运行的集群每个EC2实例生成 8 个映射器。对于具有 10 个实例的集群,这意味着生成 80 个映射器。如果写入容量单位数不大于集群中的映射器数,则 Hive 写入操作可能会占用所有写入吞吐量,或者尝试占用超过预配置值的吞吐量。有关每种EC2实例类型生成的映射器数量的更多信息,请参阅配置 Hadoop

Hadoop 中的映射器数由输入的拆分数控制。如果拆分数过小,写入命令可能无法占用所有可用的写入吞吐量。

如果具有相同键的项目在目标 DynamoDB 表中存在,则将覆盖该项目。如果目标 DynamoDB 表中不存在具有该键的项目,则将插入该项目。

要将数据从 Amazon S3 导入 DynamoDB
  • 您可以使用亚马逊EMR(亚马逊EMR)和 Hive 将数据从亚马逊 S3 写入 DynamoDB。

    CREATE EXTERNAL TABLE s3_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3_import;
在不指定列映射时将表从 Amazon S3 存储桶导入到 DynamoDB 中
  • 创建一个引用 Amazon S3 中存储数据的 EXTERNAL 表,该数据是以前从 DynamoDB 中导出的。在导入之前,请确保该表存在于 DynamoDB 中,并且该表与以前导出的 DynamoDB 表具有相同的键架构。此外,该表还必须正好具有类型为 map<string, string> 的一个列。如果您随后创建一个链接到 DynamoDB 的 Hive 表,则可以调用 INSERT OVERWRITE 命令将数据从 Amazon S3 写入到 DynamoDB 中。由于没有列映射,因此您无法查询以此方式导入的表。Hive 0.8.1.5 或更高版本支持在不指定列映射的情况下导入数据,Amazon EMR AMI 2.2.3 及更高版本支持该功能。

    CREATE EXTERNAL TABLE s3TableName (item map<string, string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' LOCATION 's3://bucketname/path/subpath/'; CREATE EXTERNAL TABLE hiveTableName (item map<string,string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM s3TableName;
将表从导入 Dynam HDFS oDB
  • 您可以使用 Amazon EMR 和 Hive 将数据从写入 DynamoD HDFS B。

    CREATE EXTERNAL TABLE hdfs_import(a_col string, b_col bigint, c_col array<string>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 'hdfs:///directoryName'; CREATE EXTERNAL TABLE hiveTableName (col1 string, col2 bigint, col3 array<string>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "dynamodbtable1", "dynamodb.column.mapping" = "col1:name,col2:year,col3:holidays"); INSERT OVERWRITE TABLE hiveTableName SELECT * FROM hdfs_import;

查询 DynamoDB 中的数据

以下示例显示了您可以使用 Amazon EMR 查询存储在 DynamoDB 中的数据的各种方式。

查找映射列的最大值 (max)
  • 使用如下 Hive 命令。在第一个命令中,该CREATE语句创建一个 Hive 表,该表引用存储在 DynamoDB 中的数据。然后,该SELECT语句使用该表来查询存储在 DynamoDB 中的数据。以下示例查找给定客户提交的最大订单。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT max(total_cost) from hive_purchases where customerId = 717;
使用 GROUP BY 子句聚合数据
  • 可以使用 GROUP BY 子句收集多条记录的数据。此子句通常与聚合函数 (如 sum、count、min 或 max) 一起使用。以下示例返回提交了三个以上订单的客户的最大订单列表。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); SELECT customerId, max(total_cost) from hive_purchases GROUP BY customerId HAVING count(*) > 3;
连接两个 DynamoDB 表
  • 以下示例将两个 Hive 表映射到 DynamoDB 中存储的数据。然后,它对这两个表调用联接。连接在集群上计算并返回。连接不在 DynamoDB 中进行。此示例返回提交了两个以上订单的客户及其购买物的列表。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE hive_customers(customerId bigint, customerName string, customerAddress array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Customers", "dynamodb.column.mapping" = "customerId:CustomerId,customerName:Name,customerAddress:Address"); Select c.customerId, c.customerName, count(*) as count from hive_customers c JOIN hive_purchases p ON c.customerId=p.customerId GROUP BY c.customerId, c.customerName HAVING count > 2;
联接来自不同源的两个表
  • 在以下示例中,Customer_S3 是一个 Hive 表,用于加载存储在 Amazon S3 中的CSV文件,而 hive_purchases 是一个引用 DynamoDB 中数据的表。以下示例将以CSV文件形式存储在 Amazon S3 中的客户数据与存储在 DynamoDB 中的订单数据合并在一起,返回一组数据,这些数据表示名字中有 “Miller” 的客户下达的订单。

    CREATE EXTERNAL TABLE hive_purchases(customerId bigint, total_cost double, items_purchased array<String>) STORED BY 'org.apache.hadoop.hive.dynamodb.DynamoDBStorageHandler' TBLPROPERTIES ("dynamodb.table.name" = "Purchases", "dynamodb.column.mapping" = "customerId:CustomerId,total_cost:Cost,items_purchased:Items"); CREATE EXTERNAL TABLE Customer_S3(customerId bigint, customerName string, customerAddress array<String>) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION 's3://bucketname/path/subpath/'; Select c.customerId, c.customerName, c.customerAddress from Customer_S3 c JOIN hive_purchases p ON c.customerid=p.customerid where c.customerName like '%Miller%';
注意

在前面的示例中,为了清晰和完整,在每个示例中都包含了CREATETABLE语句。针对给定 Hive 表运行多个查询或执行导出操作时,只需在 Hive 会话的开始创建表一次即可。