Tuesday, July 11, 2017

Apache Spark – Catalyst Optimizer


Optimizer is the one that automatically finds out the most efficient plan to execute data operations specified in the user’s program. In Spark, Catalyst is such an optimizer. It serves as a bridge between program written in High Level Programming interface (Structured APIs) and optimized execution plan. We will also see how we can customize the catalyst in this article.


Apache Spark Building Blocks



Apache Spark’s - Simple Conceptual Building blocks that leverages spark to solve various different task, from graph analysis and machine learning to streaming and ingestions.



Code which is written in Structured APIs is always converted into Low level APIs internally. In programming, one can navigate between Higher i.e. Structured APIs to Lower level APIs and vice versa.
Using the Structured APIs one can leverage application in following ways –
  1. Code in Structured APIs like Datasets, DataFrames, SQL
  2. If valid code, Spark converts it into a Logical Plan
  3. Spark internal transformation converts Logical Plan to Physical Plan
  4. Spark then executes this Physical Plan on cluster



How Catalyst Works
The code which is submitted to Spark, passes through the Catalyst Optimizer which decides how the code should be executed and lays out a plan for doing so, before finally the code is run and the result is returned to the user.




Logical Planning


This logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it's purely to convert the users set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This unresolved because while your code may be valid, the tables or columns that it refers to may or may not exist. Spark uses the catalog, a repository of all table and DataFrame information, in order to resolve columns and tables in the analyzer. The analyzer may reject the unresolved logical plan if it the required table or column name does not exist in the catalog. If it can resolve it, this result is passed through the optimizer, a collection of rules, which attempts to optimize the logical plan by pushing down predicates or selections.


Physical Planning


After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model. Physical planning results in a series of RDDs and transformations. This result is why you may have heard Spark referred to as a compiler, it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.

Catalyst defines the abstraction of users programs as a Tree and the transformation from one tree to another tree i.e. transformation of Logical plan to Physical plan. Few examples of transformation from one tree to another tree as a process of optimization.

Transform



Predicate Pushdown



Column Pruning


Transformation of trees i.e. expression to expression, logical trees to logical trees and logical trees to physical trees is achieved using various set of rules. To make Catalyst more optimized, as per our requirement we can explicitly add custom Rules.
In Spark 2.0, we have an experimental API for adding user defined custom optimizations.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.{Literal, Divide}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object RuleDivideOptimization extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Divide(left,right) if right.isInstanceOf[Literal] &&
      right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1 =>        
      left
  }
}

val df = spark.range(10).toDF("number")
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString


Plans are read bottom to top. The below are the two nodes of tree
  • 01 Range - Signifies the dataframe we created using range
  • 00 Project - Signifies the projection
spark.experimental.extraOptimizations = Seq(RuleDivideOptimization)
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString


If we observe the output now,
  • 00 Project [cast(id#0L as double) AS (number / 1)#9]
  • 01 +- Range (0, 10, step=1, splits=Some(4))
You can observe now that division is gone. This denotes that our optimization is applied successfully.
References -
  • Databricks
  • Apache Spark The Definitive Guide
  • Hadoop Summit

Monday, July 3, 2017

Indeed Scrapper

Indeed is a Job Posting, Employee Comments and Reviews Social Forum. One can understand the sentiment of Employer by reading the comments.

So now a days, many companies wants to integrate their internal data warehouse with external data sets. Below is the Indeed Python Scrapper, that crawls the comments, ratings etc. and dump the output in csv file.

# load the library
from bs4 import BeautifulSoup as Soup
import urllib, requests, re, lxml, csv, sys
import pandas as pd

#http://www.indeed.com/cmp/Google/reviews?fcountry=US

df = pd.DataFrame()  # create a new data frame
host = 'http://www.indeed.com'
cmp_tag = '/cmp/'
rev = '/reviews'
countryfilter = '?fcountry=US'
isFlag = True
with open('companies.csv', 'rb') as f:
    reader = csv.reader(f)
    for row in reader:
                isFlag = True
  try:
   company_name = row[0]
    
   # complete URL
   url = '%s%s%s%s%s' % (host, cmp_tag, company_name, rev, countryfilter)
                        print "%s" %(url)
   target = Soup(urllib.urlopen(url), 'lxml')                          
   targetElements = target.findAll('div', attrs={'class': 'cmp-review-container'})
   
   for elem in targetElements:
    cmp_review = elem.find('div', attrs={'class': 'cmp-review'})
    
    # Review Heading
    cmp_review_heading_cont = cmp_review.find('div', attrs={'class': 'cmp-review-heading'})   
    cmp_review_heading_title = cmp_review_heading_cont.find('span', attrs={'itemprop': 'name'}).getText()     
    
    # Review Overall Rating    
    cmp_review_heading_cont_span = cmp_review_heading_cont.find('span', attrs={'itemprop': 'reviewRating'})
    cmp_review_heading_rating = cmp_review_heading_cont_span.find('meta').attrs['content']    
      
    # Review Content
    cmp_review_cc = cmp_review.find('div', attrs={'class': 'cmp-review-content-container'})
    cmp_review_desc = cmp_review_cc.find('div', attrs={'class': 'cmp-review-description'})
    review_text = cmp_review_desc.find('span', attrs={'class':'cmp-review-text'}).getText()
    df = df.append({'comp_name': company_name, 'comment': review_text, 'Review Rating': cmp_review_heading_rating, 'Review Heading': cmp_review_heading_title},
    ignore_index=True)
   while isFlag:
    head_ele = target.find('head')
    next_href = head_ele.find('link', attrs={'rel': 'next'})
    if next_href != None:
     next_href = next_href.get('href')
     url = '%s%s' % (host, next_href)
     print '%s' % url
     target = Soup(urllib.urlopen(url), 'lxml')
     targetElements = target.findAll('div', attrs={'class': 'cmp-review-container'})     
     for elem in targetElements:      
      cmp_review = elem.find('div', attrs={'class': 'cmp-review'})
      
      # Review Heading
      cmp_review_heading_cont = cmp_review.find('div', attrs={'class': 'cmp-review-heading'})   
      cmp_review_heading_title = cmp_review_heading_cont.find('span', attrs={'itemprop': 'name'}).getText()     
    
      # Review Overall Rating    
      cmp_review_heading_cont_span = cmp_review_heading_cont.find('span', attrs={'itemprop': 'reviewRating'})
      cmp_review_heading_rating = cmp_review_heading_cont_span.find('meta').attrs['content']    
      
      cmp_review_cc = cmp_review.find('div', attrs={'class': 'cmp-review-content-container'})
      cmp_review_desc = cmp_review_cc.find('div', attrs={'class': 'cmp-review-description'})
      review_text = cmp_review_desc.find('span', attrs={'class':'cmp-review-text'}).getText()
      
      
      df = df.append({'comp_name': company_name, 'comment': review_text, 'Review Rating': cmp_review_heading_rating,'Review Heading': cmp_review_heading_title},
      ignore_index=True)
    else:
     isFlag = False
  except Exception as e:
   print("Unexpected error:", sys.exc_info()[0])

# Save the result to CSV
df.to_csv('D:\indeed_reviews.csv', encoding='utf-8')

       
 

Apache Spark – Catalyst Optimizer

Optimizer is the one that automatically finds out the most efficient plan to execute data operations specified in the user’s program. In...