"No one is harder on a talented person than the person themselves" - Linda Wilkinson ; "Trust your guts and don't follow the herd" ; "Validate direction not destination" ;

February 04, 2020

Physical Plans in Spark SQL - David Vrba (Socialbakers) - Part I

Most RDBMS SQL Server Execution Plan Technique applicable here as well. Strategies are the same, Pattern is different.

Summary
  • Analyze social media data in spark
  • Query plans in Spark

Theory - Query Execution
  • Layers (Logical, Physical Planning and Execution Layer)
Step 1 - Logical Planning (Building the query Tree)
  • Similar to SQL Server Query tree setup (Filters, Joins, Operators)
Step 2 - Physical Planning
  • Bridge between data frame, RDD, logical plans
  • Tree Structure with operators, Specific details
  • Detailed Execution implementation (Algorithms choice)
  • Spark Plan (Generated using Strategies)
  • Sortmerge, Broadcast Join predominantly used
  • Executed Plan (Final Version of physical plan)
  • Generates RDD code that will be passed to execution layer
  • df.queryExecution.sparkPlan
  • df.queryExecution.executedPlan
  • df.explain()
  • Spark UI SQL Interface (SQL Queries)


Frequently used operators
  • FileScan - Read data from file format
  • DataFilters, Partition Filters, PushedFilters

Link https://spark.apache.org/docs/2.3.1/sql-programming-guide.html
Key parameters
  • spark.sql.files.maxPartitionBytes - maximum number of bytes to pack into a single partition when reading files
  • spark.sql.files.openCostInBytes - estimated cost to open a file, measured by the number of bytes could be scanned in the same time
  • spark.sql.broadcastTimeout - Timeout in seconds for the broadcast wait time in broadcast joins
  • spark.sql.autoBroadcastJoinThreshold - maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled
  • spark.sql.shuffle.partitions - Configures the number of partitions to use when shuffling data for joins
Exchange
  • Represents Shuffle
  • Physical data movement on the cluster
  • Single Partition - All data moved to single partition, Might result in bottlenecks
  • HashPartitioning - columns used for partition, Induced by Aggregation operations - groupby, distinct, join
  • RoundRobinPartitioning - Specify number of partitions to be created
  • RangePartitioning - Happen when we are sorting data (orderBy)
Aggregate
  • Hash, Sort, ObjectHash
  • Hash Aggregate -> Shuffle -> Hash Aggregate
SortMergeJoin
  • Join Data Frames
  • Inner Join
  • Comes with Sort and Exchange Operator
  • Similar to MergeSort in SQL Server
BroadcastHashJoin
  • Join two dataframes
  • Broadcase Exchange
  • BroadcastHashJoin
  • Hash Join (Unsorted / Hashed and Compared)
Query Execution
  • Additional Rules on Spark Plan
  • Major Steps are EnsureRequirements, ReuseExchange (Related to Optimization)
  • EnsureRequirements
  • ReuseExchange
  • Each Physical Plan has
  • Output Partitioning
  • Output Ordering
  • requiredChildDistribution
  • requiredChildOrdering


ReuseExchange
  • Shuffle Write done on disc
  • Can be reused later




Happy Learning!!!

No comments: