Migrazione del connettore Spark Kinesis all'SDK 2.x per Amazon EMR 7.0 - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Migrazione del connettore Spark Kinesis all'SDK 2.x per Amazon EMR 7.0

L' AWS SDK fornisce un ricco set di API e librerie per interagire con i servizi di AWS cloud computing, come la gestione delle credenziali, la connessione ai servizi S3 e Kinesis. Il connettore Spark Kinesis viene utilizzato per consumare dati dal flusso di dati Kinesis e i dati ricevuti vengono trasformati ed elaborati nel motore di esecuzione di Spark. Attualmente questo connettore è basato su 1.x di AWS SDK e K (KCL). inesis-client-library

Come parte della migrazione all' AWS SDK 2.x, anche il connettore Spark Kinesis viene aggiornato di conseguenza per funzionare con l'SDK 2.x. Nella versione 7.0 di Amazon EMR, Spark contiene l'aggiornamento SDK 2.x che non è ancora disponibile nella versione community di Apache Spark. Se utilizzi il connettore Spark Kinesis da una versione precedente alla 7.0, devi effettuare la migrazione dei codici dell'applicazione per eseguirli su SDK 2.x prima di poterla effettuare su Amazon EMR 7.0.

Guide alla migrazione

Questa sezione descrive i passaggi per eseguire la migrazione di un'applicazione al connettore Spark Kinesis aggiornato. Include guide per la migrazione alla Kinesis Client Library (KCL) 2.x AWS , fornitori di credenziali AWS e client di servizi in SDK 2.x. AWS A titolo di riferimento, include anche un WordCountprogramma di esempio che utilizza il connettore Kinesis.

Migrazione di KLC da 1.x a 2.x

  • Parametri, livello e dimensioni in KinesisInputDStream

    Quando crei un'istanza KinesisInputDStream, puoi controllare il livello e le dimensioni dei parametri per il flusso. L'esempio seguente mostra come personalizzare questi parametri con KCL 1.x:

    import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(KinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet) .build()

    In KCL 2.x, queste impostazioni di configurazione hanno nomi di pacchetto diversi. Per eseguire la migrazione a 2.x:

    1. Modifica le istruzioni di importazione per com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration e com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel rispettivamente in software.amazon.kinesis.metrics.MetricsLevel e software.amazon.kinesis.metrics.MetricsUtil.

      // import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel import software.amazon.kinesis.metrics.MetricsLevel // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration import software.amazon.kinesis.metrics.MetricsUtil
    2. Sostituisci la riga metricsEnabledDimensionsKinesisClientLibConfiguration.DEFAULT_METRICS_ENABLED_DIMENSIONS.asScala.toSet con metricsEnabledDimensionsSet(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)

    Di seguito è riportata una versione aggiornata di KinesisInputDStream con livello e dimensioni dei parametri personalizzati:

    import software.amazon.kinesis.metrics.MetricsLevel import software.amazon.kinesis.metrics.MetricsUtil val kinesisStream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build()
  • Funzione del gestore di messaggi in KinesisInputDStream

    Quando crei un'istanza KinesisInputDStream, puoi anche fornire una "funzione del gestore di messaggi" che accetta un Kinesis Record e restituisce un oggetto generico T, nel caso in cui desideri utilizzare altri dati inclusi in un Record come la chiave di partizione.

    In KCL 1.x, la firma della funzione del gestore di messaggi è Record => T, dove Record è com.amazonaws.services.kinesis.model.Record. In KCL 2.x, la firma del gestore viene modificata in:KinesisClientRecord => T, where is. KinesisClientRecord software.amazon.kinesis.retrieval.KinesisClientRecord

    Di seguito è riportato un esempio di fornitura di un gestore di messaggi in KCL 1.x:

    import com.amazonaws.services.kinesis.model.Record def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Per eseguire la migrazione del gestore di messaggi:

    1. Modifica l'istruzione di importazione per com.amazonaws.services.kinesis.model.Record in software.amazon.kinesis.retrieval.KinesisClientRecord.

      // import com.amazonaws.services.kinesis.model.Record import software.amazon.kinesis.retrieval.KinesisClientRecord
    2. Aggiorna la firma del metodo del gestore di messaggi.

      //def addFive(r: Record): Int = JavaUtils.bytesToString(r.getData).toInt + 5 def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5

    Di seguito è riportato un esempio aggiornato di fornitura di un gestore di messaggi in KCL 2.x:

    import software.amazon.kinesis.retrieval.KinesisClientRecord def addFive = (r: KinesisClientRecord) => JavaUtils.bytesToString(r.data()).toInt + 5 val stream = KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(Seconds(10)) .storageLevel(StorageLevel.MEMORY_ONLY) .buildWithMessageHandler(addFive)

    Per ulteriori informazioni sulla migrazione da KCL 1.x a 2.x, consulta Migrazione dei consumatori da KCL 1.x a KCL 2.x.

Migrazione dei fornitori di AWS credenziali da SDK 1.x a 2.x AWS

I fornitori di credenziali vengono utilizzati per ottenere credenziali per le interazioni con. AWS AWS Esistono diverse modifiche all'interfaccia e alla classe relative ai fornitori di credenziali in SDK 2.x, che sono disponibili qui. Il connettore Spark Kinesis ha definito un'interfaccia org.apache.spark.streaming.kinesis.SparkAWSCredentials () e classi di implementazione che restituiscono la versione AWS 1.x dei provider di credenziali. Questi fornitori di credenziali sono necessari per inizializzare i client Kinesis. Ad esempio, se utilizzi il metodo SparkAWSCredentials.provider nelle applicazioni, dovrai aggiornare i codici per utilizzare la versione 2.x dei provider di credenziali. AWS

Di seguito è riportato un esempio di utilizzo dei provider di credenziali in AWS SDK 1.x:

import org.apache.spark.streaming.kinesis.SparkAWSCredentials import com.amazonaws.auth.AWSCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert(credentialProvider.isInstanceOf[AWSCredentialsProvider], "Type should be AWSCredentialsProvider")
Per eseguire la migrazione a SDK 2.x:
  1. Modifica l'istruzione di importazione per com.amazonaws.auth.AWSCredentialsProvider in software.amazon.awssdk.auth.credentials.AwsCredentialsProvider

    //import com.amazonaws.auth.AWSCredentialsProvider import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider
  2. Aggiorna i codici rimanenti che utilizzano questa classe.

    import org.apache.spark.streaming.kinesis.SparkAWSCredentials import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider val basicSparkCredentials = SparkAWSCredentials.builder .basicCredentials("accessKey", "secretKey") .build() val credentialProvider = basicSparkCredentials.provider assert (credentialProvider.isInstanceOf[AwsCredentialsProvider], "Type should be AwsCredentialsProvider")

Migrazione dei client di AWS servizio da AWS SDK 1.x a 2.x

AWS i client di servizio hanno nomi di pacchetto diversi in 2.x (cioèsoftware.amazon.awssdk), mentre l'SDK 1.x lo utilizza. com.amazonaws Per ulteriori informazioni sulle modifiche del client, consulta questa pagina. Se utilizzi questi client del servizio nei codici, devi eseguire la migrazione dei client di conseguenza.

Di seguito è riportato un esempio di creazione di un client in SDK 1.x:

import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient import com.amazonaws.services.dynamodbv2.document.DynamoDB AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient();
Per eseguire la migrazione a 2.x:
  1. Modifica le istruzioni di importazione per i client del servizio. Prendiamo ad esempio i client DynamoDB. Devi cambiare com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient o com.amazonaws.services.dynamodbv2.document.DynamoDB in software.amazon.awssdk.services.dynamodb.DynamoDbClient.

    // import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient // import com.amazonaws.services.dynamodbv2.document.DynamoDB import software.amazon.awssdk.services.dynamodb.DynamoDbClient
  2. Aggiorna i codici che inizializzano i client

    // AmazonDynamoDB ddbClient = AmazonDynamoDBClientBuilder.defaultClient(); // AmazonDynamoDBClient ddbClient = new AmazonDynamoDBClient(); DynamoDbClient ddbClient = DynamoDbClient.create(); DynamoDbClient ddbClient = DynamoDbClient.builder().build();

    Per ulteriori informazioni sulla migrazione dell' AWS SDK da 1.x a 2.x, consulta Cosa c'è di diverso tra SDK for AWS Java 1.x e 2.x

Esempi di codice per applicazioni di streaming

import java.net.URI import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider import software.amazon.awssdk.http.apache.ApacheHttpClient import software.amazon.awssdk.services.kinesis.KinesisClient import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest import software.amazon.awssdk.regions.Region import software.amazon.kinesis.metrics.{MetricsLevel, MetricsUtil} import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Milliseconds, StreamingContext} import org.apache.spark.streaming.dstream.DStream.toPairDStreamFunctions import org.apache.spark.streaming.kinesis.KinesisInitialPositions.Latest import org.apache.spark.streaming.kinesis.KinesisInputDStream object KinesisWordCountASLSDKV2 { def main(args: Array[String]): Unit = { val appName = "demo-app" val streamName = "demo-kinesis-test" val endpointUrl = "https://kinesis.us-west-2.amazonaws.com" val regionName = "us-west-2" // Determine the number of shards from the stream using the low-level Kinesis Client // from the AWS Java SDK. val credentialsProvider = DefaultCredentialsProvider.create require(credentialsProvider.resolveCredentials() != null, "No AWS credentials found. Please specify credentials using one of the methods specified " + "in https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/credentials.html") val kinesisClient = KinesisClient.builder() .credentialsProvider(credentialsProvider) .region(Region.US_WEST_2) .endpointOverride(URI.create(endpointUrl)) .httpClientBuilder(ApacheHttpClient.builder()) .build() val describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build() val numShards = kinesisClient.describeStream(describeStreamRequest) .streamDescription .shards .size // In this example, we are going to create 1 Kinesis Receiver/input DStream for each shard. // This is not a necessity; if there are less receivers/DStreams than the number of shards, // then the shards will be automatically distributed among the receivers and each receiver // will receive data from multiple shards. val numStreams = numShards // Spark Streaming batch interval val batchInterval = Milliseconds(2000) // Kinesis checkpoint interval is the interval at which the DynamoDB is updated with information // on sequence number of records that have been received. Same as batchInterval for this // example. val kinesisCheckpointInterval = batchInterval // Setup the SparkConfig and StreamingContext val sparkConfig = new SparkConf().setAppName("KinesisWordCountASLSDKV2") val ssc = new StreamingContext(sparkConfig, batchInterval) // Create the Kinesis DStreams val kinesisStreams = (0 until numStreams).map { i => KinesisInputDStream.builder .streamingContext(ssc) .streamName(streamName) .endpointUrl(endpointUrl) .regionName(regionName) .initialPosition(new Latest()) .checkpointAppName(appName) .checkpointInterval(kinesisCheckpointInterval) .storageLevel(StorageLevel.MEMORY_AND_DISK_2) .metricsLevel(MetricsLevel.DETAILED) .metricsEnabledDimensions(Set(MetricsUtil.OPERATION_DIMENSION_NAME, MetricsUtil.SHARD_ID_DIMENSION_NAME)) .build() } // Union all the streams val unionStreams = ssc.union(kinesisStreams) // Convert each line of Array[Byte] to String, and split into words val words = unionStreams.flatMap(byteArray => new String(byteArray).split(" ")) // Map each word to a (word, 1) tuple so we can reduce by key to count the words val wordCounts = words.map(word => (word, 1)).reduceByKey(_ + _) // Print the first 10 wordCounts wordCounts.print() // Start the streaming context and await termination ssc.start() ssc.awaitTermination() } }

Considerazioni sull'utilizzo del connettore Spark Kinesis aggiornato

  • Se le tue applicazioni utilizzano la versione Kinesis-producer-library con la versione di JDK precedente alla 11, potresti imbatterti in eccezioni come java.lang.NoClassDefFoundError: javax/xml/bind/DatatypeConverter. Ciò accade perché EMR 7.0 viene fornito con JDK 17 per impostazione predefinita e i moduli J2EE sono stati rimossi dalle librerie standard a partire da Java 11+. Questo problema può essere risolto aggiungendo la seguente dipendenza nel file pom. Sostituisci la versione della libreria con una versione che pensi possa essere più adeguata.

    <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>${jaxb-api.version}</version> </dependency>
  • Il jar del connettore Spark Kinesis si trova in questo percorso dopo la creazione di un cluster EMR: /usr/lib/spark/connector/lib/