使用 Trino 处理冰山牌桌 - AWS 规范性指导

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Trino 处理冰山牌桌

本节介绍如何在 Amazon EM R 上使用 Trino 设置和操作 Iceberg 表。示例是可以在集群上的 Amazon EMR 上运行的样板代码。 EC2 本节中的代码示例和配置假设您使用的是 Amazon EMR 版本 emr-7.9.0。

安装时的 Amazon EMR EC2

  1. 创建iceberg.properties包含以下内容的文件。如果语句中未明确指定新表的默认存储格式,则该iceberg.file-format=parquet设置将确定新CREATE TABLE表的默认存储格式。

    connector.name=iceberg iceberg.catalog.type=glue iceberg.file-format=parquet fs.native-s3.enabled=true
  2. iceberg.properties 文件上传到 S3 存储桶。 

  3. 创建一个引导操作,该操作从您的 S3 存储桶中复制iceberg.properties文件并将其作为 Trino 配置文件存储在您将要创建的 Amazon EMR 集群上。 请务必<S3-bucket-name>使用您的 S3 存储桶名称替换。 

    #!/bin/bash set -ex sudo aws s3 cp s3://<S3-bucket-name>/iceberg.properties /etc/trino/conf/catalog/iceberg.properties
  4. 创建安装了 Trino 的 Amazon EMR 集群,并将前一个脚本的执行指定为引导操作。以下是创建集群的示例 AWS Command Line Interface (AWS CLI) 命令:

    aws emr create-cluster --release-label emr-7.9.0 \ --applications Name=Trino \ --region <region> \ --name Trino_Iceberg_Cluster \ --bootstrap-actions '[{"Path":"s3://<S3-bucket-name>/bootstrap.sh","Name":"Add iceberg.properties"}]' \ --instance-groups '[{"InstanceGroupType":"MASTER","InstanceCount":1,"InstanceType":"m5.xlarge"},{"InstanceGroupType":"CORE","InstanceCount":3,"InstanceType":"m5.xlarge"}]' \ --service-role "<IAM-service-role>" \ --ec2-attributes '{"KeyName":"<key-name>","InstanceProfile":"<EMR-EC2-instance-profile>"}'

    你在哪里替换:

    • <S3-bucket-name>使用您的 S3 存储桶名称

    • <region>根据你的具体情况 AWS 区域

    • <key-name>用你的 key pair。如果密钥对(key pair)不存在,则会将其创建。

    • <IAM-service-role>使用遵循最低权限原则的 Amazon EMR 服务角色。 

    • <EMR-EC2-instance-profile>使用您的实例配置文件。 

  5. 初始化 Amazon EMR 集群后,您可以通过运行以下命令来初始化 Trino 会话:

    trino-cli
  6. 在 Trino CLI 中,您可以通过运行以下命令来查看目录:

    SHOW CATALOGS;

创建 Iceberg 表

要创建 Iceberg 表,你可以使用CREATE TABLE语句。  以下是创建使用 Iceberg 隐藏分区的分区表的示例:

CREATE TABLE iceberg.iceberg_db.iceberg_table ( userid int, firstname varchar, city varchar) WITH ( format = 'PARQUET', partitioning = ARRAY['city', 'bucket(userid, 16)'], location = 's3://<S3-bucket>/<prefix>');
注意

如果您未指定格式,则将使用您在上一节中配置的iceberg.file-format值。

要插入数据,请使用INSERT INTO命令。示例如下:

INSERT INTO iceberg.iceberg_db.iceberg_table (userid, firstname, city) VALUES (1001, 'John', 'New York'), (1002, 'Mary', 'Los Angeles'), (1003, 'Mateo', 'Chicago'), (1004, 'Shirley', 'Houston'), (1005, 'Diego', 'Miami'), (1006, 'Nikki', 'Seattle'), (1007, 'Pat', 'Boston'), (1008, 'Terry', 'San Francisco'), (1009, 'Richard', 'Denver'), (1010, 'Pat', 'Phoenix');

从冰山桌上读书

您可以使用SELECT语句读取 Iceberg 表的最新状态,如下所示:

SELECT * FROM iceberg.iceberg_db.iceberg_table;

将数据更新到 Iceberg 表中

您可以使用MERGE INTO语句执行 upsert 操作(同时插入新记录和更新现有记录)。示例如下:

MERGE INTO iceberg.iceberg_db.iceberg_table target USING ( VALUES (1001, 'John Updated', 'Boston'), -- Update existing user (1002, 'Mary Updated', 'Seattle'), -- Update existing user (1011, 'Martha', 'Portland'), -- Insert new user (1012, 'Paulo', 'Austin') -- Insert new user ) AS source (userid, firstname, city) ON target.userid = source.userid WHEN MATCHED THEN UPDATE SET firstname = source.firstname, city = source.city WHEN NOT MATCHED THEN INSERT (userid, firstname, city) VALUES (source.userid, source.firstname, source.city);

从 Iceberg 表中删除记录

要从 Iceberg 表中删除数据,请使用DELETE FROM表达式并指定与要删除的行匹配的筛选器。示例如下:

DELETE FROM iceberg.iceberg_db.iceberg_table WHERE userid IN (1003, 1004);

查询 Iceberg 表元数据

Iceberg 通过 SQL 提供对其元数据的访问。您可以通过查询命名空间来访问任何给定表 (<table_name>) 的元数据"<table_name>.$<metadata_table>"。有关元数据表的完整列表,请参阅 Iceberg 文档中的检查表

以下是用于检查 Iceberg 元数据的示例查询列表:

SELECT FROM iceberg.iceberg_db."iceberg_table$snapshots"; SELECT FROM iceberg.iceberg_db."iceberg_table$history"; SELECT FROM iceberg.iceberg_db."iceberg_table$partitions"; SELECT FROM iceberg.iceberg_db."iceberg_table$files"; SELECT FROM iceberg.iceberg_db."iceberg_table$manifests"; SELECT FROM iceberg.iceberg_db."iceberg_table$refs"; SELECT * FROM iceberg.iceberg_db."iceberg_table$metadata_log_entries";

例如,此查询:

SELECT * FROM iceberg.iceberg_db."iceberg_table$snapshots";

提供了输出:

查询 Iceberg 表元数据的输出。

使用时空旅行

Iceberg 表中的每个写入操作(插入、更新、更新或删除)都会创建一个新快照。然后,您可以使用这些快照进行时空旅行,以回到过去,检查表的过去状态。

以下时空旅行查询根据特定内容显示表的状态snapshot_id

SELECT * FROM iceberg.iceberg_db.iceberg_table FOR VERSION AS OF 241938428756831817;

以下时空旅行查询根据特定的时间戳显示表的状态:

SELECT * FROM iceberg.iceberg_db.iceberg_table FOR TIMESTAMP AS OF TIMESTAMP '2025-05-28 16:09:40.268 UTC'

将 Iceberg 与 Trino 搭配使用时的注意事项

Trino 对 Iceberg 表的写入操作遵循了merge-on-read设计,因此它们会创建位置删除文件,而不是重写受更新或删除影响的整个数据文件。如果你想使用 copy-on-write这种方法,可以考虑使用 Spark 进行写入操作。