Optimizer is the one that automatically finds out the most efficient plan to execute data operations specified in the user’s program. In Spark, Catalyst is such an optimizer. It serves as a bridge between program written in High Level Programming interface (Structured APIs) and optimized execution plan. We will also see how we can customize the catalyst in this article.
Apache Spark Building Blocks
Apache Spark’s - Simple Conceptual Building blocks that leverages spark to solve various different task, from graph analysis and machine learning to streaming and ingestions.
Code which is written in Structured APIs is always converted into Low level APIs internally. In programming, one can navigate between Higher i.e. Structured APIs to Lower level APIs and vice versa.
- Code in Structured APIs like Datasets, DataFrames, SQL
- If valid code, Spark converts it into a Logical Plan
- Spark internal transformation converts Logical Plan to Physical Plan
The code which is submitted to Spark, passes through the Catalyst Optimizer which decides how the code should be executed and lays out a plan for doing so, before finally the code is run and the result is returned to the user.
Logical Planning
This logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it's purely to convert the users set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This unresolved because while your code may be valid, the tables or columns that it refers to may or may not exist. Spark uses the catalog, a repository of all table and DataFrame information, in order to resolve columns and tables in the analyzer. The analyzer may reject the unresolved logical plan if it the required table or column name does not exist in the catalog. If it can resolve it, this result is passed through the optimizer, a collection of rules, which attempts to optimize the logical plan by pushing down predicates or selections.
Physical Planning
After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model. Physical planning results in a series of RDDs and transformations. This result is why you may have heard Spark referred to as a compiler, it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.
Catalyst defines the abstraction of users programs as a Tree and the transformation from one tree to another tree i.e. transformation of Logical plan to Physical plan. Few examples of transformation from one tree to another tree as a process of optimization.
Transform
Predicate Pushdown
Column Pruning
Transformation of trees i.e. expression to expression, logical trees to logical trees and logical trees to physical trees is achieved using various set of rules. To make Catalyst more optimized, as per our requirement we can explicitly add custom Rules.
In Spark 2.0, we have an experimental API for adding user defined custom optimizations.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.{Literal, Divide}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
object RuleDivideOptimization extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case Divide(left,right) if right.isInstanceOf[Literal] &&
right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1 =>
left
}
}
val df = spark.range(10).toDF("number")
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString
Plans are read bottom to top. The below are the two nodes of tree
- 01 Range - Signifies the dataframe we created using range
- 00 Project - Signifies the projection
spark.experimental.extraOptimizations = Seq(RuleDivideOptimization)
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString
If we observe the output now,
- 00 Project [cast(id#0L as double) AS (number / 1)#9]
You can observe now that division is gone. This denotes that our optimization is applied successfully.
References -
- Databricks
- Apache Spark The Definitive Guide
- Hadoop Summit
No comments:
Post a Comment