Koneksi JDBC - AWS Glue

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Koneksi JDBC

Jenis database tertentu, biasanya relasional, mendukung koneksi melalui standar JDBC. Untuk informasi selengkapnya tentang JDBC, lihat dokumentasi Java JDBC API. AWS Glue secara native mendukung koneksi ke database tertentu melalui konektor JDBC mereka - perpustakaan JDBC disediakan dalam pekerjaan Glue Spark. AWS Saat menghubungkan ke tipe database ini menggunakan pustaka AWS Glue, Anda memiliki akses ke serangkaian opsi standar.

Nilai ConnectionType JDBC meliputi yang berikut:

  • "connectionType": "sqlserver": Mengkhususkan koneksi ke Microsoft SQL Server.

  • "connectionType": "mysql": Mengkhususkan koneksi ke basis data MySQL.

  • "connectionType": "oracle": Mengkhususkan koneksi ke basis data Oracle.

  • "connectionType": "postgresql": Mengkhususkan koneksi ke basis data PostgreSQL.

  • "connectionType": "redshift": Mengkhususkan koneksi ke basis data Amazon Redshift. Untuk informasi selengkapnya, lihat Koneksi Redshift.

Tabel berikut mencantumkan versi driver JDBC yang AWS Glue mendukung.

Produk Versi driver JDBC untuk Glue 4.0 Versi driver JDBC untuk Glue 3.0 Versi driver JDBC untuk Glue 0.9, 1.0, 2.0
Microsoft SQL Server 9.4.0 7.x 6.x
MySQL 8.0.23 8.0.23 5.1
Basis data Oracle 21.7 21.1 11.2
PostgreSQL 42.3.6 42.2.18 42.1.x
MongoDB 4.7.2 4.0.0 2.0.0
Pergeseran Merah Amazon* redshift-jdbc42-2.1.0.16 redshift-jdbc41-1.2.12.1017 redshift-jdbc41-1.2.12.1017

* Untuk jenis koneksi Amazon Redshift, semua pasangan nama/nilai opsi lain yang disertakan dalam opsi koneksi untuk koneksi JDBC, termasuk opsi pemformatan, diteruskan langsung ke SparkSQL yang mendasarinya. DataSource Dalam pekerjaan AWS Glue with Spark di AWS Glue 4.0 dan versi yang lebih baru, konektor asli AWS Glue untuk Amazon Redshift menggunakan integrasi Amazon Redshift untuk Apache Spark. Untuk informasi selengkapnya, lihat integrasi Amazon Redshift untuk Apache Spark. Di versi sebelumnya, lihat Sumber data Amazon Redshift untuk Spark.

Untuk mengonfigurasi VPC Amazon agar tersambung ke penyimpanan data Amazon RDS menggunakan JDBC, lihat. Menyiapkan Amazon VPC untuk koneksi JDBC ke penyimpanan data Amazon RDS AWS Glue

catatan

AWSGlue jobs hanya dikaitkan dengan satu subnet selama menjalankan. Ini dapat memengaruhi kemampuan Anda untuk terhubung ke beberapa sumber data melalui pekerjaan yang sama. Perilaku ini tidak terbatas pada sumber JDBC.

Referensi opsi koneksi JDBC

Jika Anda sudah memiliki koneksi AWS Glue JDBC yang ditentukan, Anda dapat menggunakan kembali properti konfigurasi yang ditentukan di dalamnya, seperti: url, pengguna, dan kata sandi; jadi Anda tidak perlu menentukannya dalam kode sebagai opsi koneksi. Fitur ini tersedia dalam AWS Glue 3.0 dan versi yang lebih baru. Untuk melakukannya, gunakan properti koneksi berikut:

  • "useConnectionProperties": Setel ke “true” untuk menunjukkan Anda ingin menggunakan konfigurasi dari koneksi.

  • "connectionName": Masukkan nama koneksi untuk mengambil konfigurasi dari, koneksi harus ditentukan di wilayah yang sama dengan pekerjaan.

Gunakan opsi koneksi ini dengan koneksi JDBC:

  • "url": (Wajib) URL JDBC untuk basis data.

  • "dbtable": (Wajib) Tabel database untuk dibaca. Untuk penyimpanan data JDBC yang mendukung skema dalam basis data, tentukan schema.table-name. Jika skema tidak disediakan, maka skema "publik" default digunakan.

  • "user": (Wajib) Nama pengguna yang akan digunakan saat terhubung.

  • "password": (Wajib) Kata sandi yang akan digunakan saat terhubung.

  • (Opsional) Opsi berikut memungkinkan Anda untuk memberikan driver JDBC kustom. Gunakan opsi ini jika Anda harus menggunakan driver yang AWS Glue tidak mendukung secara asli.

    Tugas ETL dapat menggunakan versi driver JDBC yang berbeda untuk sumber data dan target, bahkan jika sumber dan target adalah produk basis data yang sama. Hal ini memungkinkan Anda untuk memigrasi data antara sumber dan target basis data dengan versi yang berbeda. Untuk menggunakan opsi ini, Anda harus terlebih dahulu mengunggah file JAR dari driver JDBC ke Amazon S3.

    • "customJdbcDriverS3Path": Jalur Amazon S3 dari driver JDBC khusus.

    • "customJdbcDriverClassName": Nama kelas driver JDBC.

  • "bulkSize": (Opsional) Digunakan untuk mengkonfigurasi sisipan paralel untuk mempercepat beban massal ke target JDBC. Tentukan nilai integer untuk tingkat paralelisme yang akan digunakan saat menulis atau memasukkan data. Opsi ini berguna untuk meningkatkan kinerja penulisan ke dalam database seperti Arch User Repository (AUR).

  • "hashfield"(Opsional) String, digunakan untuk menentukan nama kolom dalam tabel JDBC yang akan digunakan untuk membagi data menjadi partisi saat membaca dari tabel JDBC secara paralel. Berikan “hashfield” ATAU “hashexpression”. Untuk informasi selengkapnya, lihat Membaca dari tabel JDBC secara paralel.

  • "hashexpression"(Opsional) Klausa pilih SQL mengembalikan bilangan bulat. Digunakan untuk membagi data dalam tabel JDBC menjadi partisi saat membaca dari tabel JDBC secara paralel. Berikan “hashfield” ATAU “hashexpression”. Untuk informasi selengkapnya, lihat Membaca dari tabel JDBC secara paralel.

  • "hashpartitions"(Opsional) Sebuah bilangan bulat positif. Digunakan untuk menentukan jumlah pembacaan paralel dari tabel JDBC saat membaca dari tabel JDBC secara paralel. Default: 7. Untuk informasi selengkapnya, lihat Membaca dari tabel JDBC secara paralel.

  • "sampleQuery": (Opsional) Pernyataan kueri SQL kustom. Digunakan untuk menentukan subset informasi dalam tabel untuk mengambil sampel isi tabel. Ketika dikonfigurasi tanpa memperhatikan data Anda, itu bisa kurang efisien daripada DynamicFrame metode, menyebabkan batas waktu atau kehabisan kesalahan memori. Untuk informasi selengkapnya, lihat Gunakan SampleQuery.

  • "enablePartitioningForSampleQuery": (Opsional) Sebuah boolean. Default: false. Digunakan untuk mengaktifkan membaca dari tabel JDBC secara paralel saat menentukan. sampleQuery Jika disetel ke true, sampleQuery harus diakhiri dengan “where” atau “and” agar AWS Glue menambahkan kondisi partisi. Untuk informasi selengkapnya, lihat Gunakan SampleQuery.

  • "sampleSize": (Opsional) Sebuah bilangan bulat positif. Membatasi jumlah baris yang dikembalikan oleh kueri sampel. Bekerja hanya ketika enablePartitioningForSampleQuery itu benar. Jika partisi tidak diaktifkan, Anda harus langsung menambahkan "limit x" sampleQuery untuk membatasi ukuran. Untuk informasi selengkapnya, lihat Gunakan SampleQuery.

Gunakan SampleQuery

Bagian ini menjelaskan cara menggunakansampleQuery, sampleSize danenablePartitioningForSampleQuery.

sampleQuerydapat menjadi cara yang efisien untuk mengambil sampel beberapa baris dataset Anda. Secara default, kueri dijalankan oleh seorang eksekutor tunggal. Ketika dikonfigurasi tanpa memperhatikan data Anda, itu bisa kurang efisien daripada DynamicFrame metode, menyebabkan batas waktu atau kehabisan kesalahan memori. Menjalankan SQL pada database yang mendasarinya sebagai bagian dari pipeline ETL Anda umumnya hanya diperlukan untuk tujuan kinerja. Jika Anda mencoba untuk melihat pratinjau beberapa baris dataset Anda, pertimbangkan untuk menggunakanshow. Jika Anda mencoba untuk mengubah dataset Anda menggunakan SQL, pertimbangkan toDF untuk menggunakan untuk mendefinisikan transformasi SparkSQL terhadap data Anda dalam formulir. DataFrame

Meskipun kueri Anda dapat memanipulasi berbagai tabel, dbtable tetap diperlukan.

Menggunakan SampleQuery untuk mengambil sampel tabel Anda

Saat menggunakan perilaku SampleQuery default untuk mengambil sampel data Anda, AWS Glue tidak mengharapkan throughput yang substansif, sehingga menjalankan kueri Anda pada satu eksekutor. Untuk membatasi data yang Anda berikan dan tidak menyebabkan masalah kinerja, kami sarankan Anda memberikan SQL dengan LIMIT klausa.

contoh Gunakan SampleQuery tanpa partisi

Contoh kode berikut menunjukkan bagaimana menggunakan sampleQuery tanpa partisi.

//A full sql query statement. val query = "select name from $tableName where age > 0 limit 1" val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "sampleQuery" -> query )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

Menggunakan SampleQuery terhadap kumpulan data yang lebih besar

Jika Anda membaca kumpulan data besar, Anda mungkin perlu mengaktifkan partisi JDBC untuk menanyakan tabel secara paralel. Untuk informasi selengkapnya, lihat Membaca dari tabel JDBC secara paralel. Untuk digunakan sampleQuery dengan partisi JDBC, atur ke true. enablePartitioningForSampleQuery Mengaktifkan fitur ini mengharuskan Anda untuk membuat beberapa perubahan pada AndasampleQuery.

Saat menggunakan partisi JDBC dengansampleQuery, kueri Anda harus diakhiri dengan “where” atau “and” agar AWS Glue menambahkan kondisi partisi.

Jika Anda ingin membatasi hasil SampleQuery Anda saat membaca dari tabel JDBC secara paralel, atur "sampleSize" parameter daripada menentukan klausa. LIMIT

contoh Gunakan SampleQuery dengan partisi JDBC

Contoh kode berikut menunjukkan bagaimana menggunakan sampleQuery dengan partisi JDBC.

//note that the query should end with "where" or "and" if use with JDBC partitioning. val query = "select name from $tableName where age > 0 and" //Enable JDBC partitioning by setting hashfield. //to use sampleQuery with partitioning, set enablePartitioningForSampleQuery. //use sampleSize to limit the size of returned data. val connectionOptions = JsonOptions(Map( "url" -> url, "dbtable" -> tableName, "user" -> user, "password" -> password, "hashfield" -> primaryKey, "sampleQuery" -> query, "enablePartitioningForSampleQuery" -> true, "sampleSize" -> "1" )) val dyf = glueContext.getSource("mysql", connectionOptions) .getDynamicFrame()

Catatan dan Pembatasan:

Contoh kueri tidak dapat digunakan bersama dengan bookmark pekerjaan. Status bookmark akan diabaikan ketika konfigurasi untuk keduanya disediakan.

Gunakan driver JDBC khusus

Contoh kode berikut ini menunjukkan cara membaca dan menulis ke basis data JDBC dengan driver JDBC kustom. Mereka menunjukkan membaca dari satu versi produk database, dan menulis ke versi yang lebih baru dari produk yang sama.

Python
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext, SparkConf from awsglue.context import GlueContext from awsglue.job import Job import time from pyspark.sql.types import StructType, StructField, IntegerType, StringType sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session # Construct JDBC connection options connection_mysql5_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd"} connection_mysql8_options = { "url": "jdbc:mysql://<jdbc-host-name>:3306/db", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "customJdbcDriverClassName": "com.mysql.cj.jdbc.Driver"} connection_oracle11_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd"} connection_oracle18_options = { "url": "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL", "dbtable": "test", "user": "admin", "password": "pwd", "customJdbcDriverS3Path": "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "customJdbcDriverClassName": "oracle.jdbc.OracleDriver"} # Read from JDBC databases with custom driver df_mysql8 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from MySQL 5 and write to MySQL 8 df_mysql5 = glueContext.create_dynamic_frame.from_options(connection_type="mysql", connection_options=connection_mysql5_options) glueContext.write_from_options(frame_or_dfc=df_mysql5, connection_type="mysql", connection_options=connection_mysql8_options) # Read DynamicFrame from Oracle 11 and write to Oracle 18 df_oracle11 = glueContext.create_dynamic_frame.from_options(connection_type="oracle", connection_options=connection_oracle11_options) glueContext.write_from_options(frame_or_dfc=df_oracle11, connection_type="oracle", connection_options=connection_oracle18_options)
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamicFrame import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { val MYSQL_5_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val MYSQL_8_URI: String = "jdbc:mysql://<jdbc-host-name>:3306/db" val ORACLE_11_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" val ORACLE_18_URI: String = "jdbc:oracle:thin:@//<jdbc-host-name>:1521/ORCL" // Construct JDBC connection options lazy val mysql5JsonOption = jsonOptions(MYSQL_5_URI) lazy val mysql8JsonOption = customJDBCDriverJsonOptions(MYSQL_8_URI, "s3://DOC-EXAMPLE-BUCKET/mysql-connector-java-8.0.17.jar", "com.mysql.cj.jdbc.Driver") lazy val oracle11JsonOption = jsonOptions(ORACLE_11_URI) lazy val oracle18JsonOption = customJDBCDriverJsonOptions(ORACLE_18_URI, "s3://DOC-EXAMPLE-BUCKET/ojdbc10.jar", "oracle.jdbc.OracleDriver") def main(sysArgs: Array[String]): Unit = { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) // Read from JDBC database with custom driver val df_mysql8: DynamicFrame = glueContext.getSource("mysql", mysql8JsonOption).getDynamicFrame() // Read DynamicFrame from MySQL 5 and write to MySQL 8 val df_mysql5: DynamicFrame = glueContext.getSource("mysql", mysql5JsonOption).getDynamicFrame() glueContext.getSink("mysql", mysql8JsonOption).writeDynamicFrame(df_mysql5) // Read DynamicFrame from Oracle 11 and write to Oracle 18 val df_oracle11: DynamicFrame = glueContext.getSource("oracle", oracle11JsonOption).getDynamicFrame() glueContext.getSink("oracle", oracle18JsonOption).writeDynamicFrame(df_oracle11) Job.commit() } private def jsonOptions(url: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd"}""".stripMargin) } private def customJDBCDriverJsonOptions(url: String, customJdbcDriverS3Path: String, customJdbcDriverClassName: String): JsonOptions = { new JsonOptions( s"""{"url": "${url}", |"dbtable":"test", |"user": "admin", |"password": "pwd", |"customJdbcDriverS3Path": "${customJdbcDriverS3Path}", |"customJdbcDriverClassName" : "${customJdbcDriverClassName}"}""".stripMargin) } }