Lavorare con le tabelle Iceberg utilizzando PyIceberg - AWS Guida prescrittiva

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Lavorare con le tabelle Iceberg utilizzando PyIceberg

Questa sezione spiega come è possibile interagire con le tabelle Iceberg utilizzando. PyIceberg Gli esempi forniti sono codice standard che puoi eseguire su EC2 istanze, AWS Lambdafunzioni di Amazon Linux 2023 o qualsiasi ambiente Python con credenziali configurate correttamente.AWS

Prerequisiti

Nota

PyIceberg Questi esempi utilizzano la versione 1.9.1.

Con cui lavorare PyIceberg, è necessario PyIceberg e AWS SDK per Python (Boto3) installato. Ecco un esempio di come è possibile configurare un ambiente virtuale Python con PyIceberg cui lavorare e: AWS Glue Data Catalog

  1. Scarica PyIcebergutilizzando il programma di installazione del pacchetto pip python. Hai anche bisogno di Boto3 con cui interagire. Servizi AWS Puoi configurare un ambiente virtuale Python locale da testare usando questi comandi:

    python3 -m venv my_env cd my_env/bin/ source activate pip install "pyiceberg[pyarrow,pandas,glue]" pip install boto3
  2. Esegui python per aprire la shell Python e testare i comandi.

Connessione al Data Catalog

Per iniziare a utilizzare le tabelle Iceberg in AWS Glue, devi prima connetterti a. AWS Glue Data Catalog 

La load_catalog funzione inizializza una connessione al Data Catalog creando un oggetto di catalogo che funge da interfaccia principale per tutte le operazioni di Iceberg:

from pyiceberg.catalog import load_catalog region = "us-east-1" glue_catalog = load_catalog( 'default', **{ 'client.region': region }, type='glue' )

Elencare e creare database

Per elencare i database esistenti, usa la list_namespaces funzione:

databases = glue_catalog.list_namespaces() print(databases)

Per creare un nuovo database, usa la create_namespace funzione:

database_name="mydb" s3_db_path=f"s3://amzn-s3-demo-bucket/{database_name}" glue_catalog.create_namespace(database_name, properties={"location": s3_db_path})

Creazione e scrittura di tabelle Iceberg

Tabelle non partizionate

Ecco un esempio di creazione di una tabella Iceberg non partizionata utilizzando la funzione: create_table

from pyiceberg.schema import Schema from pyiceberg.types import NestedField, StringType, DoubleType database_name="mydb" table_name="pyiceberg_table" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{table_name}" schema = Schema( NestedField(1, "city", StringType(), required=False), NestedField(2, "lat", DoubleType(), required=False), NestedField(3, "long", DoubleType(), required=False), ) glue_catalog.create_table(f"{database_name}.{table_name}", schema=schema, location=s3_table_path)

Puoi usare la list_tables funzione per controllare l'elenco delle tabelle all'interno di un database:

tables = glue_catalog.list_tables(namespace=database_name) print(tables)

Puoi usare la append funzione e PyArrow inserire dati all'interno di una tabella Iceberg:

import pyarrow as pa df = pa.Table.from_pylist( [ {"city": "Amsterdam", "lat": 52.371807, "long": 4.896029}, {"city": "San Francisco", "lat": 37.773972, "long": -122.431297}, {"city": "Drachten", "lat": 53.11254, "long": 6.0989}, {"city": "Paris", "lat": 48.864716, "long": 2.349014}, ], ) table = glue_catalog.load_table(f"{database_name}.{table_name}") table.append(df)

Tabelle partizionate

Ecco un esempio di creazione di una tabella Iceberg partizionata con partizionamento nascosto utilizzando la funzione e: create_table PartitionSpec

from pyiceberg.schema import Schema from pyiceberg.types import ( NestedField, StringType, FloatType, DoubleType, TimestampType, ) # Define the schema schema = Schema( NestedField(field_id=1, name="datetime", field_type=TimestampType(), required=True), NestedField(field_id=2, name="drone_id", field_type=StringType(), required=True), NestedField(field_id=3, name="lat", field_type=DoubleType(), required=False), NestedField(field_id=4, name="lon", field_type=DoubleType(), required=False), NestedField(field_id=5, name="height", field_type=FloatType(), required=False), ) from pyiceberg.partitioning import PartitionSpec, PartitionField from pyiceberg.transforms import DayTransform partition_spec = PartitionSpec( PartitionField( source_id=1, # Refers to "datetime" field_id=1000, transform=DayTransform(), name="datetime_day" ) ) database_name="mydb" partitioned_table_name="pyiceberg_table_partitioned" s3_table_path=f"s3://amzn-s3-demo-bucket/{database_name}/{partitioned_table_name}" glue_catalog.create_table( identifier=f"{database_name}.{partitioned_table_name}", schema=schema, location=s3_table_path, partition_spec=partition_spec )

È possibile inserire dati in una tabella partizionata allo stesso modo in cui si inserisce una tabella non partizionata. Il partizionamento viene gestito automaticamente.

from datetime import datetime arrow_schema = pa.schema([ pa.field("datetime", pa.timestamp("us"), nullable=False), pa.field("drone_id", pa.string(), nullable=False), pa.field("lat", pa.float64()), pa.field("lon", pa.float64()), pa.field("height", pa.float32()), ]) data = [ { "datetime": datetime(2024, 6, 1, 12, 0, 0), "drone_id": "drone_001", "lat": 52.371807, "lon": 4.896029, "height": 120.5, }, { "datetime": datetime(2024, 6, 1, 12, 5, 0), "drone_id": "drone_002", "lat": 37.773972, "lon": -122.431297, "height": 150.0, }, { "datetime": datetime(2024, 6, 2, 9, 0, 0), "drone_id": "drone_001", "lat": 53.11254, "lon": 6.0989, "height": 110.2, }, { "datetime": datetime(2024, 6, 2, 9, 30, 0), "drone_id": "drone_003", "lat": 48.864716, "lon": 2.349014, "height": 145.7, }, ] df = pa.Table.from_pylist(data, schema=arrow_schema) table = glue_catalog.load_table(f"{database_name}.{partitioned_table_name}") table.append(df)

Lettura dei dati

È possibile utilizzare la PyIceberg scan funzione per leggere i dati dalle tabelle Iceberg. È possibile filtrare righe, selezionare colonne specifiche e limitare il numero di record restituiti.

table= glue_catalog.load_table(f"{database_name}.{table_name}") scan_df = table.scan( row_filter=( f"city = 'Amsterdam'" ), selected_fields=("city", "lat"), limit=100, ).to_pandas() print(scan_df)

Eliminazione di dati

La PyIceberg delete funzione consente di rimuovere i record dalla tabella utilizzando undelete_filter:

table = glue_catalog.load_table(f"{database_name}.{table_name}") table.delete(delete_filter="city == 'Paris'")

Accesso ai metadati

PyIceberg fornisce diverse funzioni per accedere ai metadati delle tabelle. Ecco come è possibile visualizzare le informazioni sulle istantanee delle tabelle:

#List of snapshots table.snapshots() #Current snapshot table.current_snapshot() #Take a previous snapshot second_last_snapshot_id=table.snapshots()[-2].snapshot_id print(f"Second last SnapshotID: {second_last_snapshot_id}")

Per un elenco dettagliato dei metadati disponibili, consulta la sezione di riferimento al codice dei metadati della documentazione. PyIceberg

Usare il viaggio nel tempo

È possibile utilizzare le istantanee delle tabelle per viaggiare nel tempo per accedere agli stati precedenti della tabella. Ecco come visualizzare lo stato della tabella prima dell'ultima operazione:

second_last_snapshot_id=table.snapshots()[-2].snapshot_id time_travel_df = table.scan( limit=100, snapshot_id=second_last_snapshot_id ).to_pandas() print(time_travel_df)

Per un elenco completo delle funzioni disponibili, consulta la documentazione dell'API PyIceberg Python.