Analyze query plan for shuffle
Who this is for:
Architecture / Concept Overview: Analyze query plan for shuffle
Spark performance bottlenecks typically fall into three categories: redundant reads (solved by caching), expensive joins (solved by broadcast joins), and unnecessary shuffles (solved by partition and shuffle tuning). Understanding when and how to apply each technique is the key to performant Spark applications.
%%{init: {"theme":"base","themeVariables":{"background":"#0B0E14","primaryTextColor":"#E0E6ED","lineColor":"#5D6470","darkMode":true,"primaryColor":"#2E4A4A","secondaryColor":"#374151","secondaryTextColor":"#E0E6ED","tertiaryColor":"#111827","tertiaryTextColor":"#E0E6ED","edgeLabelBackground":"#1f2937"}}}%%
flowchart LR
classDef source fill:#3F4B59,stroke:#9CA3AF,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef ingestion fill:#5A4B36,stroke:#C9A86B,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef processing fill:#535072,stroke:#8E82B4,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef storage fill:#2E4A4A,stroke:#5FAFA8,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef serving fill:#3D5550,stroke:#6BB7AA,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef governance fill:#5A3F52,stroke:#C28BB0,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
SLOW[Slow Query]:::source --> DIAG{Diagnose Bottleneck}:::processing
DIAG --> IO[I/O Bound: Repeated Scans]:::storage
DIAG --> SHUF[Shuffle Bound: Large Data Exchange]:::processing
DIAG --> SKEW[Skew Bound: Uneven Task Distribution]:::governance
IO --> CACHE[Solution: Cache/Persist]:::serving
SHUF --> BC[Solution: Broadcast Join]:::serving
SHUF --> AQE[Solution: AQE + Partition Tuning]:::serving
SKEW --> SALT[Solution: Salting / AQE Skew Join]:::serving
*Performance diagnosis: identify the bottleneck category, then apply the appropriate optimization.*
%%{init: {"theme":"base","themeVariables":{"background":"#0B0E14","primaryTextColor":"#E0E6ED","lineColor":"#5D6470","darkMode":true,"primaryColor":"#2E4A4A","secondaryColor":"#374151","secondaryTextColor":"#E0E6ED","tertiaryColor":"#111827","tertiaryTextColor":"#E0E6ED","edgeLabelBackground":"#1f2937"}}}%%
graph TD
classDef source fill:#3F4B59,stroke:#9CA3AF,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef ingestion fill:#5A4B36,stroke:#C9A86B,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef processing fill:#535072,stroke:#8E82B4,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef storage fill:#2E4A4A,stroke:#5FAFA8,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef serving fill:#3D5550,stroke:#6BB7AA,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
classDef governance fill:#5A3F52,stroke:#C28BB0,stroke-width:2px,rx:8,ry:8,color:#E0E6ED
JOIN[Join Strategies]:::processing
JOIN --> BHJ[Broadcast Hash Join]:::serving
JOIN --> SMJ[Sort-Merge Join]:::processing
JOIN --> SHJ[Shuffle Hash Join]:::processing
BHJ --> B1[Small table broadcast to all executors]:::serving
BHJ --> B2[No shuffle required]:::serving
BHJ --> B3[Best for small dimension tables]:::serving
SMJ --> S1[Both sides shuffled and sorted]:::processing
SMJ --> S2[Default for large-large joins]:::processing
SMJ --> S3[Most expensive but most general]:::processing
SHJ --> H1[Both sides shuffled, hash table built]:::processing
SHJ --> H2[Good when one side fits in memory]:::processing
*Spark join strategies ordered by cost: broadcast is cheapest, sort-merge is most general.*
Key Terms
Prerequisites and Setup
- A Databricks cluster with the Spark UI accessible.
- Tables to query and analyze in Unity Catalog.
- Familiarity with the Spark UI (Jobs, Stages, Tasks tabs).
Step-by-Step Implementation
Configuration Reference
| Parameter | Description | Default |
|---|---|---|
spark.sql.autoBroadcastJoinThreshold | Max table size for auto-broadcast | 10MB |
spark.sql.shuffle.partitions | Initial shuffle partition count | 200 |
spark.sql.adaptive.enabled | Enable AQE | true |
spark.sql.adaptive.coalescePartitions.enabled | AQE partition coalescing | true |
spark.sql.adaptive.advisoryPartitionSizeInBytes | Target partition size after AQE coalescing | 64MB |
spark.sql.adaptive.skewJoin.enabled | AQE skew join optimization | true |
spark.sql.adaptive.skewJoin.skewedPartitionFactor | Skew detection factor | 5 |
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | Skew detection threshold | 256MB |
spark.databricks.io.cache.enabled | Enable Delta disk caching | false |