Tipi e opzioni di connessione per ETL in AWS Glue per Spark - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Tipi e opzioni di connessione per ETL in AWS Glue per Spark

In AWS Glue per Spark, diversi metodi e trasformazioni PySpark e Scala specificano il tipo di connessione utilizzando un parametro connectionType. Specificano le opzioni di connessione utilizzando un parametro connectionOptions o options.

Il parametro connectionType può assumere i valori indicati nella tabella seguente. I valori dei parametri associati connectionOptions (o options) per ciascun tipo sono documentati nelle sezioni seguenti. Salvo indicazione contraria, i parametri si applicano quando la connessione viene utilizzata come sorgente o sink.

Per il codice di esempio che illustra l'impostazione e l'utilizzo delle opzioni di connessione, consulta la home page per ogni tipo di connessione.

connectionType Si connette a
dynamodb Amazon DynamoDB database
kinesis Flusso di dati Amazon Kinesis
s3 Amazon S3
documentdb Amazon DocumentDB (con compatibilità MongoDB) database
opensearch Servizio OpenSearch di Amazon.
redshift Database Amazon Redshift
kafka Kafka o Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos per NoSQL.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb Database MongoDB, incluso MongoDB Atlas.
sqlserver Microsoft SQL Server database (vedere Connessioni JDBC)
mysql MySQL database (vedere Connessioni JDBC)
oracle Oracle database (vedere Connessioni JDBC)
postgresql PostgreSQL database (vedere Connessioni JDBC)
saphana SAP HANA.
snowflake Data lake Snowflake
teradata Teradata Vantage.
vertica Vertica.
personalizzato.* Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS
marketplace.* Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS)

Valori di personalizzazione e connectionType Marketplace AWS

Questi sono i seguenti:

  • "connectionType": "marketplace.athena": designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "marketplace.spark": designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "marketplace.jdbc": designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "custom.athena": designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

  • "connectionType": "custom.spark": designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

  • "connectionType": "custom.jdbc": designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

Opzioni di connessione per il tipo custom.jdbc o marketplace.jdbc

  • className: stringa, obbligatorio, nome della classe driver.

  • connectionName: stringa, obbligatorio, nome della connessione associata al connettore.

  • url: stringa, obbligatorio, URL JDBC con segnaposto (${}) che vengono utilizzati per creare la connessione all'origine dati. Il segnaposto ${secretKey} viene sostituito con il segreto con lo stesso nome in AWS Secrets Manager. Per ulteriori informazioni sulla creazione dell'URL, fare riferimento alla documentazione dell'archivio dati.

  • secretId o user/password: stringa, obbligatorio, utilizzato per recuperare le credenziali per l'URL.

  • dbTable o query: stringa, obbligatorio, la tabella o la query SQL da cui ottenere i dati. Puoi specificare dbTable o query, ma non entrambi.

  • partitionColumn: stringa, facoltativo, il nome di una colonna intera utilizzata per il partizionamento. Questa opzione funziona solo quando è inclusa con lowerBound, upperBound e numPartitions. Questa opzione funziona allo stesso modo del lettore Spark SQL JDBC. Per ulteriori informazioni, consulta Da JDBC ad altri database nel manuale Apache Spark SQL, DataFrames and Datasets Guide.

    I valori lowerBound e upperBound vengono utilizzati per decidere lo stride della partizione, non per filtrare le righe nella tabella. Tutte le righe della tabella vengono partizionate e restituite.

    Nota

    Quando si utilizza una query anziché un nome di tabella, è necessario verificare che la query funzioni con la condizione di partizionamento specificata. Ad esempio:

    • Se il formato della query è "SELECT col1 FROM table1", testa la query aggiungendo una clausola WHERE alla fine della query che utilizza la colonna della partizione.

    • Se il formato della query è "SELECT col1 FROM table1 WHERE col2=val", testa la query estendendo la clausola WHERE con AND e un'espressione che utilizza la colonna della partizione.

  • lowerBound: intero, facoltativo, il valore minimo di partitionColumn che viene utilizzato per decidere lo stride della partizione.

  • upperBound: intero, facoltativo, il valore massimo di partitionColumn che viene utilizzato per decidere lo stride della partizione.

  • numPartitions: intero, facoltativo, il numero di partizioni. Questo valore, insieme a lowerBound (incluso) e upperBound (escluso), forma stride di partizione per espressioni con le clausole WHERE generate che vengono utilizzate per dividere la partitionColumn.

    Importante

    Presta attenzione al numero di partizioni perché troppe partizioni potrebbero causare problemi nei sistemi di database esterni.

  • filterPredicate: stringa, opzionale, clausola condizione extra per filtrare i dati dall'origine. Ad esempio:

    BillingCity='Mountain View'

    Quando si utilizza una query anziché un nome di table, è necessario verificare che la query funzioni con il filterPredicate specificato. Ad esempio:

    • Se il formato della query è "SELECT col1 FROM table1", testa la query aggiungendo una clausola WHERE alla fine della query che utilizza il predicato filtro.

    • Se il formato della query è "SELECT col1 FROM table1 WHERE col2=val", testa la query estendendo la clausola WHERE con AND e un'espressione che utilizza il predicato filtro.

  • dataTypeMapping: dizionario, opzionale, mappatura del tipo di dati personalizzata che crea una mappatura da un tipo di dati JDBC a un tipo di dati Glue. Ad esempio, l'opzione "dataTypeMapping":{"FLOAT":"STRING"} mappa i campi di dati di tipo JDBC FLOAT nel tipo Java String chiamando il metodo ResultSet.getString() del driver e lo usa per costruire registri di AWS Glue. L'oggetto ResultSet viene implementato da ciascun driver, quindi il comportamento è specifico del driver utilizzato. Consulta la documentazione relativa al driver JDBC per capire come il driver esegue le conversioni.

  • I tipi di dati AWS Glue correntemente supportati sono:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    I tipi di dati JDBC supportati sono Java8 java.sql.types.

    Le mappature di default dei tipi di dati (da JDBC a AWS Glue) sono:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    Se si utilizza un mapping del tipo di dati personalizzato con l'opzione dataTypeMapping, è possibile sovrascrivere una mappatura di default del tipo di dati. Sono interessati solo i tipi di dati JDBC elencati nell'opzione dataTypeMapping; per tutti gli altri tipi di dati JDBC viene utilizzata la mappatura di default. Se necessario, è possibile aggiungere mappature per tipi di dati JDBC aggiuntivi. Se un tipo di dati JDBC non è incluso nella mappatura di default o in una mappatura personalizzata, per impostazione predefinita viene convertito nel tipo di dati STRING AWS Glue.

Negli esempi di codice Python riportati di seguito viene illustrato come leggere dai database JDBC con driver JDBC Marketplace AWS. Mostra la lettura da un database e la scrittura in una posizione S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni di connessione per il tipo custom.athena o marketplace.athena

  • className – Stringa, obbligatorio, nome della classe driver. Quando si utilizza il connettore Athena-CloudWatch, questo valore di parametro è il prefisso della classe Name (ad esempio, "com.amazonaws.athena.connectors"). Il connettore Athena-CloudWatch è composto da due classi: un gestore di metadati e un gestore di registri. Se si fornisce qui il prefisso comune, l'API carica le classi corrette in base a tale prefisso.

  • tableName: stringa, obbligatorio, il nome del flusso di log CloudWatch da leggere. In questo frammento di codice viene utilizzato il nome della vista speciale all_log_streams, il che significa che il frame di dati dinamico restituito conterrà i dati di tutti i flussi di log nel gruppo di log.

  • schemaName: stringa, obbligatorio, il nome del gruppo di log CloudWatch da cui leggere. Ad esempio, /aws-glue/jobs/output.

  • connectionName – Stringa, obbligatorio, nome della connessione associata al connettore.

Per ulteriori opzioni per questo connettore, consultare il README del connettore Amazon Athena CloudWatch su GitHub.

Il seguente esempio di codice Python mostra come leggere da un archivio dati Athena utilizzando un connettore Marketplace AWS. Mostra la lettura da Athena e la scrittura in una posizione S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni di connessione per il tipo custom.spark o marketplace.spark

  • className: stringa, obbligatorio, nome della classe del connettore.

  • secretId: stringa, facoltativo, utilizzato per recuperare le credenziali per la connessione del connettore.

  • connectionName – Stringa, obbligatorio, nome della connessione associata al connettore.

  • Altre opzioni dipendono dall'archivio dati. Ad esempio, le opzioni di configurazione di OpenSearch iniziano con il prefisso es, come descritto nella documentazione di Elasticsearch per Apache Hadoop. Le connessioni Spark a Snowflake utilizzano opzioni come sfUser e sfPassword, come descritto in Using the Spark Connector nella guida Connecting to Snowflake.

Il seguente esempio di codice Python mostra come leggere da un archivio dati OpenSearch utilizzando una connessione marketplace.spark.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni generali

Le opzioni in questa sezione sono fornite come connettore connection_options, ma non si applicano specificamente a tale connettore.

I seguenti parametri vengono generalmente utilizzati per la configurazione dei segnalibri. Possono applicarsi ai flussi di lavoro Amazon S3 o JDBC. Per ulteriori informazioni, consulta Utilizzo di segnalibri di processo.

  • jobBookmarkKeys: un array di nomi di colonna.

  • jobBookmarkKeysSortOrder: una stringa che definisce come confrontare i valori in base all'ordinamento. Valori validi: "asc", "desc".

  • useS3ListImplementation: utilizzato per gestire le prestazioni della memoria quando si elencano i contenuti dei bucket Amazon S3. Per ulteriori informazioni, consulta la pagina Optimize memory management in AWS Glue.