Lecture et écriture à partir de et vers Amazon Redshift - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Lecture et écriture à partir de et vers Amazon Redshift

Les exemples de code suivants permettent de lire et PySpark d'écrire des exemples de données depuis et vers une base de données Amazon Redshift à l'aide d'une API de source de données et de SparkSQL.

Data source API

PySpark À utiliser pour lire et écrire des exemples de données depuis et vers une base de données Amazon Redshift à l'aide d'une API de source de données.

import boto3 from pyspark.sql import SQLContext sc = # existing SparkContext sql_context = SQLContext(sc) url = "jdbc:redshift:iam://redshifthost:5439/database" aws_iam_role_arn = "arn:aws:iam::accountID:role/roleName" df = sql_context.read \ .format("io.github.spark_redshift_community.spark.redshift") \ .option("url", url) \ .option("dbtable", "tableName") \ .option("tempdir", "s3://path/for/temp/data") \ .option("aws_iam_role", "aws_iam_role_arn") \ .load() df.write \ .format("io.github.spark_redshift_community.spark.redshift") \ .option("url", url) \ .option("dbtable", "tableName_copy") \ .option("tempdir", "s3://path/for/temp/data") \ .option("aws_iam_role", "aws_iam_role_arn") \ .mode("error") \ .save()
SparkSQL

Permet PySpark de lire et d'écrire des exemples de données depuis et vers une base de données Amazon Redshift à l'aide de SparkSQL.

import boto3 import json import sys import os from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .enableHiveSupport() \ .getOrCreate() url = "jdbc:redshift:iam://redshifthost:5439/database" aws_iam_role_arn = "arn:aws:iam::accountID:role/roleName" bucket = "s3://path/for/temp/data" tableName = "tableName" # Redshift table name s = f"""CREATE TABLE IF NOT EXISTS {tableName} (country string, data string) USING io.github.spark_redshift_community.spark.redshift OPTIONS (dbtable '{tableName}', tempdir '{bucket}', url '{url}', aws_iam_role '{aws_iam_role_arn}' ); """ spark.sql(s) columns = ["country" ,"data"] data = [("test-country","test-data")] df = spark.sparkContext.parallelize(data).toDF(columns) # Insert data into table df.write.insertInto(tableName, overwrite=False) df = spark.sql(f"SELECT * FROM {tableName}") df.show()