Working with Iceberg tables by using PyIceberg
This section explains how you can interact with Iceberg tables by using PyIceberg
Prerequisites
Note
These examples use PyIceberg 1.9.1
To work with PyIceberg, you need PyIceberg and AWS SDK for Python (Boto3) installed. Here's an example of how you can set up a Python virtual environment to work with PyIceberg and AWS Glue Data Catalog:
-
Download PyIceberg
by using the pip python package installer . You also need Boto3 to interact with AWS services. You can configure a local Python virtual environment to test by using these commands: python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
-
Run
python
to open the Python shell and test the commands.
Connecting to the Data Catalog
To start working with Iceberg tables in AWS Glue, you first need to connect to the AWS Glue Data Catalog.
The load_catalog
function initializes a connection to the Data
Catalog by creating a catalog
from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )
Listing and creating databases
To list existing databases, use the list_namespaces
function:
databases = glue_catalog.list_namespaces() print(databases)
To create a new database, use the create_namespace
function:
database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})
Creating and writing Iceberg tables
Unpartitioned tables
Here's an example of creating an unpartitioned Iceberg table by using the
create_table
function:
from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)
You can use the list_tables
function to check the list of tables
inside a database:
tables = glue_catalog.list_tables(namespace=database_name) print(tables)
You can use the append
function and PyArrow to insert data
inside an Iceberg table:
import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)
Partitioned tables
Here's an example of creating a partitionedcreate_table
function and
PartitionSpec
:
from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )
You can insert data into a partitioned table the same way as for an unpartitioned table. The partitioning is handled automatically.
from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)
Reading data
You can use the PyIceberg scan
function to read data from your Iceberg
tables. You can filter rows, select specific columns, and limit the number of returned
records.
table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)
Deleting data
The PyIceberg delete
function lets you remove records from your table by
using a delete_filter
:
table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")
Accessing metadata
PyIceberg provides several functions to access table metadata. Here's how you can view information about table snapshots:
#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")
For a detailed list of available metadata, see the metadata
Using time travel
You can use table snapshots for time travel to access previous states of your table. Here's how to view the table state before the last operation:
second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)
For a complete list of available functions, see the PyIceberg Python API