Scala スクリプトの例 - ストリーミング ETL - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Scala スクリプトの例 - ストリーミング ETL

次のスクリプト例では、Amazon Kinesis Data Streams に接続し、Data Catalog のスキーマを使用してデータストリームを解析した上で、そのストリームを Amazon S3 にある静的データセットに結合します。結合された結果は Parquet 形式で Amazon S3 に出力されます。

// This script connects to an Amazon Kinesis stream, uses a schema from the data catalog to parse the stream, // joins the stream to a static dataset on Amazon S3, and outputs the joined results to Amazon S3 in parquet format. import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import java.util.Calendar import org.apache.spark.SparkContext import org.apache.spark.sql.Dataset import org.apache.spark.sql.Row import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.streaming.Trigger import scala.collection.JavaConverters._ object streamJoiner { def main(sysArgs: Array[String]) { val spark: SparkContext = new SparkContext() val glueContext: GlueContext = new GlueContext(spark) val sparkSession: SparkSession = glueContext.getSparkSession import sparkSession.implicits._ // @params: [JOB_NAME] val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val staticData = sparkSession.read // read() returns type DataFrameReader .format("csv") .option("header", "true") .load("s3://awsexamplebucket-streaming-demo2/inputs/productsStatic.csv") // load() returns a DataFrame val datasource0 = sparkSession.readStream // readstream() returns type DataStreamReader .format("kinesis") .option("streamName", "stream-join-demo") .option("endpointUrl", "https://kinesis.us-east-1.amazonaws.com") .option("startingPosition", "TRIM_HORIZON") .load // load() returns a DataFrame val selectfields1 = datasource0.select(from_json($"data".cast("string"), glueContext.getCatalogSchemaAsSparkSchema("stream-demos", "stream-join-demo2")) as "data").select("data.*") val datasink2 = selectfields1.writeStream.foreachBatch { (dataFrame: Dataset[Row], batchId: Long) => { //foreachBatch() returns type DataStreamWriter val joined = dataFrame.join(staticData, "product_id") val year: Int = Calendar.getInstance().get(Calendar.YEAR) val month :Int = Calendar.getInstance().get(Calendar.MONTH) + 1 val day: Int = Calendar.getInstance().get(Calendar.DATE) val hour: Int = Calendar.getInstance().get(Calendar.HOUR_OF_DAY) if (dataFrame.count() > 0) { joined.write // joined.write returns type DataFrameWriter .mode(SaveMode.Append) .format("parquet") .option("quote", " ") .save("s3://awsexamplebucket-streaming-demo2/output/" + "/year=" + "%04d".format(year) + "/month=" + "%02d".format(month) + "/day=" + "%02d".format(day) + "/hour=" + "%02d".format(hour) + "/") } } } // end foreachBatch() .trigger(Trigger.ProcessingTime("100 seconds")) .option("checkpointLocation", "s3://awsexamplebucket-streaming-demo2/checkpoint/") .start().awaitTermination() // start() returns type StreamingQuery Job.commit() } }