撰寫 Spark 應用程式 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

撰寫 Spark 應用程式

可以使用 Scala、Java 或 Python 來撰寫 Spark 應用程式。在 Apache Spark 文件的 Spark 範例主題中有幾個 Spark 應用程式的範例。估算 Pi 的範例就如在三個原生支援應用程式中所示。您也可以在 $SPARK_HOME/examples和 中檢視完整的範例GitHub。如需如何建置 JARs Spark 的詳細資訊,請參閱 Apache Spark 文件中的快速入門主題。

Scala

為了避免 Scala 相容性問題,我們建議您在編譯 Amazon EMR叢集的 Spark 應用程式時,使用 Spark 相依性做為正確的 Scala 版本。您應該使用的 Scala 版本取決於您的叢集上安裝的 Spark 版本。例如,Amazon 5.30.1 EMR版使用 Spark 2.4.5,這是使用 Scala 2.11 建置。如果您的叢集使用 Amazon 5.30.1 EMR版,請使用 Scala 2.11 的 Spark 相依性。如需有關 Spark 使用之 Scala 版本的詳細資訊,請參閱 Apache Spark 文件

package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }

Java

package org.apache.spark.examples; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; /** * Computes an approximation to pi * Usage: JavaSparkPi [slices] */ public final class JavaSparkPi { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); System.out.println("Pi is roughly " + 4.0 * count / n); jsc.stop(); } }

Python

import argparse import logging from operator import add from random import random from pyspark.sql import SparkSession logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") def calculate_pi(partitions, output_uri): """ Calculates pi by testing a large number of random numbers against a unit circle inscribed inside a square. The trials are partitioned so they can be run in parallel on cluster instances. :param partitions: The number of partitions to use for the calculation. :param output_uri: The URI where the output is written, typically an Amazon S3 bucket, such as 's3://example-bucket/pi-calc'. """ def calculate_hit(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x**2 + y**2 < 1 else 0 tries = 100000 * partitions logger.info( "Calculating pi with a total of %s tries in %s partitions.", tries, partitions ) with SparkSession.builder.appName("My PyPi").getOrCreate() as spark: hits = ( spark.sparkContext.parallelize(range(tries), partitions) .map(calculate_hit) .reduce(add) ) pi = 4.0 * hits / tries logger.info("%s tries and %s hits gives pi estimate of %s.", tries, hits, pi) if output_uri is not None: df = spark.createDataFrame([(tries, hits, pi)], ["tries", "hits", "pi"]) df.write.mode("overwrite").json(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--partitions", default=2, type=int, help="The number of parallel partitions to use when calculating pi.", ) parser.add_argument( "--output_uri", help="The URI where output is saved, typically an S3 bucket." ) args = parser.parse_args() calculate_pi(args.partitions, args.output_uri)