使用 Spark EXPLAIN 语句对 Spark SQL 进行故障排除 - Amazon Athena

使用 Spark EXPLAIN 语句对 Spark SQL 进行故障排除

您可以将 Spark EXPLAIN 语句与 Spark SQL 结合使用,以对 Spark 代码进行故障排除。以下代码和输出示例显示了这种用法。

例 – Spark SELECT 语句
spark.sql("select * from select_taxi_table").explain(True)

输出

Calculation started (calculation_id=20c1ebd0-1ccf-ef14-db35-7c1844876a7e) in (session=24c1ebcb-57a8-861e-1023-736f5ae55386). Checking calculation status... Calculation completed. == Parsed Logical Plan == 'Project [*] +- 'UnresolvedRelation [select_taxi_table], [], false == Analyzed Logical Plan == VendorID: bigint, passenger_count: bigint, count: bigint Project [VendorID#202L, passenger_count#203L, count#204L] +- SubqueryAlias spark_catalog.spark_demo_database.select_taxi_table +- Relation spark_demo_database.select_taxi_table[VendorID#202L, passenger_count#203L,count#204L] csv == Optimized Logical Plan == Relation spark_demo_database.select_taxi_table[VendorID#202L, passenger_count#203L,count#204L] csv == Physical Plan == FileScan csv spark_demo_database.select_taxi_table[VendorID#202L, passenger_count#203L,count#204L] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths) [s3://DOC-EXAMPLE-BUCKET/select_taxi], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VendorID:bigint,passenger_count:bigint,count:bigint>
例 – Spark 数据框

以下代码示例显示如何将 EXPLAIN 与 Spark 数据框结合使用。

taxi1_df=taxi_df.groupBy("VendorID", "passenger_count").count() taxi1_df.explain("extended")

输出

Calculation started (calculation_id=d2c1ebd1-f9f0-db25-8477-3effc001b309) in (session=24c1ebcb-57a8-861e-1023-736f5ae55386). Checking calculation status... Calculation completed. == Parsed Logical Plan == 'Aggregate ['VendorID, 'passenger_count], ['VendorID, 'passenger_count, count(1) AS count#321L] +- Relation [VendorID#49L,tpep_pickup_datetime#50,tpep_dropoff_datetime#51, passenger_count#52L,trip_distance#53,RatecodeID#54L,store_and_fwd_flag#55, PULocationID#56L,DOLocationID#57L,payment_type#58L,fare_amount#59, extra#60,mta_tax#61,tip_amount#62,tolls_amount#63,improvement_surcharge#64, total_amount#65,congestion_surcharge#66,airport_fee#67] parquet == Analyzed Logical Plan == VendorID: bigint, passenger_count: bigint, count: bigint Aggregate [VendorID#49L, passenger_count#52L], [VendorID#49L, passenger_count#52L, count(1) AS count#321L] +- Relation [VendorID#49L,tpep_pickup_datetime#50,tpep_dropoff_datetime#51, passenger_count#52L,trip_distance#53,RatecodeID#54L,store_and_fwd_flag#55, PULocationID#56L,DOLocationID#57L,payment_type#58L,fare_amount#59,extra#60, mta_tax#61,tip_amount#62,tolls_amount#63,improvement_surcharge#64, total_amount#65,congestion_surcharge#66,airport_fee#67] parquet == Optimized Logical Plan == Aggregate [VendorID#49L, passenger_count#52L], [VendorID#49L, passenger_count#52L, count(1) AS count#321L] +- Project [VendorID#49L, passenger_count#52L] +- Relation [VendorID#49L,tpep_pickup_datetime#50,tpep_dropoff_datetime#51, passenger_count#52L,trip_distance#53,RatecodeID#54L,store_and_fwd_flag#55, PULocationID#56L,DOLocationID#57L,payment_type#58L,fare_amount#59,extra#60, mta_tax#61,tip_amount#62,tolls_amount#63,improvement_surcharge#64, total_amount#65,congestion_surcharge#66,airport_fee#67] parquet == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[VendorID#49L, passenger_count#52L], functions=[count(1)], output=[VendorID#49L, passenger_count#52L, count#321L]) +- Exchange hashpartitioning(VendorID#49L, passenger_count#52L, 1000), ENSURE_REQUIREMENTS, [id=#531] +- HashAggregate(keys=[VendorID#49L, passenger_count#52L], functions=[partial_count(1)], output=[VendorID#49L, passenger_count#52L, count#326L]) +- FileScan parquet [VendorID#49L,passenger_count#52L] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[s3://DOC-EXAMPLE-BUCKET/ notebooks/yellow_tripdata_2016-01.parquet], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<VendorID:bigint,passenger_count:bigint>