本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
撰寫 Spark 應用程式
可以使用 Scala、Java 或 Python 來撰寫 Spark$SPARK_HOME/examples
和 中檢視完整的範例GitHub
Scala
為了避免 Scala 相容性問題,我們建議您在編譯 Amazon EMR叢集的 Spark 應用程式時,使用 Spark 相依性做為正確的 Scala 版本。您應該使用的 Scala 版本取決於您的叢集上安裝的 Spark 版本。例如,Amazon 5.30.1 EMR版使用 Spark 2.4.5,這是使用 Scala 2.11 建置。如果您的叢集使用 Amazon 5.30.1 EMR版,請使用 Scala 2.11 的 Spark 相依性。如需有關 Spark 使用之 Scala 版本的詳細資訊,請參閱 Apache Spark 文件
package org.apache.spark.examples import scala.math.random import org.apache.spark._ /** Computes an approximation to pi */ object SparkPi { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Spark Pi") val spark = new SparkContext(conf) val slices = if (args.length > 0) args(0).toInt else 2 val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow val count = spark.parallelize(1 until n, slices).map { i => val x = random * 2 - 1 val y = random * 2 - 1 if (x*x + y*y < 1) 1 else 0 }.reduce(_ + _) println("Pi is roughly " + 4.0 * count / n) spark.stop() } }
Java
package org.apache.spark.examples; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import java.util.ArrayList; import java.util.List; /** * Computes an approximation to pi * Usage: JavaSparkPi [slices] */ public final class JavaSparkPi { public static void main(String[] args) throws Exception { SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi"); JavaSparkContext jsc = new JavaSparkContext(sparkConf); int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); for (int i = 0; i < n; i++) { l.add(i); } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function<Integer, Integer>() { @Override public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); System.out.println("Pi is roughly " + 4.0 * count / n); jsc.stop(); } }
Python
import argparse import logging from operator import add from random import random from pyspark.sql import SparkSession logger = logging.getLogger(__name__) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") def calculate_pi(partitions, output_uri): """ Calculates pi by testing a large number of random numbers against a unit circle inscribed inside a square. The trials are partitioned so they can be run in parallel on cluster instances. :param partitions: The number of partitions to use for the calculation. :param output_uri: The URI where the output is written, typically an Amazon S3 bucket, such as 's3://example-bucket/pi-calc'. """ def calculate_hit(_): x = random() * 2 - 1 y = random() * 2 - 1 return 1 if x**2 + y**2 < 1 else 0 tries = 100000 * partitions logger.info( "Calculating pi with a total of %s tries in %s partitions.", tries, partitions ) with SparkSession.builder.appName("My PyPi").getOrCreate() as spark: hits = ( spark.sparkContext.parallelize(range(tries), partitions) .map(calculate_hit) .reduce(add) ) pi = 4.0 * hits / tries logger.info("%s tries and %s hits gives pi estimate of %s.", tries, hits, pi) if output_uri is not None: df = spark.createDataFrame([(tries, hits, pi)], ["tries", "hits", "pi"]) df.write.mode("overwrite").json(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( "--partitions", default=2, type=int, help="The number of parallel partitions to use when calculating pi.", ) parser.add_argument( "--output_uri", help="The URI where output is saved, typically an S3 bucket." ) args = parser.parse_args() calculate_pi(args.partitions, args.output_uri)