Tuesday, July 11, 2017

Apache Spark – Catalyst Optimizer


Optimizer is the one that automatically finds out the most efficient plan to execute data operations specified in the user’s program. In Spark, Catalyst is such an optimizer. It serves as a bridge between program written in High Level Programming interface (Structured APIs) and optimized execution plan. We will also see how we can customize the catalyst in this article.


Apache Spark Building Blocks



Apache Spark’s - Simple Conceptual Building blocks that leverages spark to solve various different task, from graph analysis and machine learning to streaming and ingestions.



Code which is written in Structured APIs is always converted into Low level APIs internally. In programming, one can navigate between Higher i.e. Structured APIs to Lower level APIs and vice versa.
Using the Structured APIs one can leverage application in following ways –
  1. Code in Structured APIs like Datasets, DataFrames, SQL
  2. If valid code, Spark converts it into a Logical Plan
  3. Spark internal transformation converts Logical Plan to Physical Plan
  4. Spark then executes this Physical Plan on cluster



How Catalyst Works
The code which is submitted to Spark, passes through the Catalyst Optimizer which decides how the code should be executed and lays out a plan for doing so, before finally the code is run and the result is returned to the user.




Logical Planning


This logical plan only represents a set of abstract transformations that do not refer to executors or drivers, it's purely to convert the users set of expressions into the most optimized version. It does this by converting user code into an unresolved logical plan. This unresolved because while your code may be valid, the tables or columns that it refers to may or may not exist. Spark uses the catalog, a repository of all table and DataFrame information, in order to resolve columns and tables in the analyzer. The analyzer may reject the unresolved logical plan if it the required table or column name does not exist in the catalog. If it can resolve it, this result is passed through the optimizer, a collection of rules, which attempts to optimize the logical plan by pushing down predicates or selections.


Physical Planning


After successfully creating an optimized logical plan, Spark then begins the physical planning process. The physical plan, often called a Spark plan, specifies how the logical plan will execute on the cluster by generating different physical execution strategies and comparing them through a cost model. Physical planning results in a series of RDDs and transformations. This result is why you may have heard Spark referred to as a compiler, it takes queries in DataFrames, Datasets, and SQL and compiles them into RDD transformations for you.

Catalyst defines the abstraction of users programs as a Tree and the transformation from one tree to another tree i.e. transformation of Logical plan to Physical plan. Few examples of transformation from one tree to another tree as a process of optimization.

Transform



Predicate Pushdown



Column Pruning


Transformation of trees i.e. expression to expression, logical trees to logical trees and logical trees to physical trees is achieved using various set of rules. To make Catalyst more optimized, as per our requirement we can explicitly add custom Rules.
In Spark 2.0, we have an experimental API for adding user defined custom optimizations.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.expressions.{Literal, Divide}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object RuleDivideOptimization extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
    case Divide(left,right) if right.isInstanceOf[Literal] &&
      right.asInstanceOf[Literal].value.asInstanceOf[Double] == 1 =>        
      left
  }
}

val df = spark.range(10).toDF("number")
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString


Plans are read bottom to top. The below are the two nodes of tree
  • 01 Range - Signifies the dataframe we created using range
  • 00 Project - Signifies the projection
spark.experimental.extraOptimizations = Seq(RuleDivideOptimization)
df.selectExpr("number/1").queryExecution.optimizedPlan.numberedTreeString


If we observe the output now,
  • 00 Project [cast(id#0L as double) AS (number / 1)#9]
  • 01 +- Range (0, 10, step=1, splits=Some(4))
You can observe now that division is gone. This denotes that our optimization is applied successfully.
References -
  • Databricks
  • Apache Spark The Definitive Guide
  • Hadoop Summit

Monday, July 3, 2017

Indeed Scrapper

Indeed is a Job Posting, Employee Comments and Reviews Social Forum. One can understand the sentiment of Employer by reading the comments.

So now a days, many companies wants to integrate their internal data warehouse with external data sets. Below is the Indeed Python Scrapper, that crawls the comments, ratings etc. and dump the output in csv file.

# load the library
from bs4 import BeautifulSoup as Soup
import urllib, requests, re, lxml, csv, sys
import pandas as pd

#http://www.indeed.com/cmp/Google/reviews?fcountry=US

df = pd.DataFrame()  # create a new data frame
host = 'http://www.indeed.com'
cmp_tag = '/cmp/'
rev = '/reviews'
countryfilter = '?fcountry=US'
isFlag = True
with open('companies.csv', 'rb') as f:
    reader = csv.reader(f)
    for row in reader:
                isFlag = True
  try:
   company_name = row[0]
    
   # complete URL
   url = '%s%s%s%s%s' % (host, cmp_tag, company_name, rev, countryfilter)
                        print "%s" %(url)
   target = Soup(urllib.urlopen(url), 'lxml')                          
   targetElements = target.findAll('div', attrs={'class': 'cmp-review-container'})
   
   for elem in targetElements:
    cmp_review = elem.find('div', attrs={'class': 'cmp-review'})
    
    # Review Heading
    cmp_review_heading_cont = cmp_review.find('div', attrs={'class': 'cmp-review-heading'})   
    cmp_review_heading_title = cmp_review_heading_cont.find('span', attrs={'itemprop': 'name'}).getText()     
    
    # Review Overall Rating    
    cmp_review_heading_cont_span = cmp_review_heading_cont.find('span', attrs={'itemprop': 'reviewRating'})
    cmp_review_heading_rating = cmp_review_heading_cont_span.find('meta').attrs['content']    
      
    # Review Content
    cmp_review_cc = cmp_review.find('div', attrs={'class': 'cmp-review-content-container'})
    cmp_review_desc = cmp_review_cc.find('div', attrs={'class': 'cmp-review-description'})
    review_text = cmp_review_desc.find('span', attrs={'class':'cmp-review-text'}).getText()
    df = df.append({'comp_name': company_name, 'comment': review_text, 'Review Rating': cmp_review_heading_rating, 'Review Heading': cmp_review_heading_title},
    ignore_index=True)
   while isFlag:
    head_ele = target.find('head')
    next_href = head_ele.find('link', attrs={'rel': 'next'})
    if next_href != None:
     next_href = next_href.get('href')
     url = '%s%s' % (host, next_href)
     print '%s' % url
     target = Soup(urllib.urlopen(url), 'lxml')
     targetElements = target.findAll('div', attrs={'class': 'cmp-review-container'})     
     for elem in targetElements:      
      cmp_review = elem.find('div', attrs={'class': 'cmp-review'})
      
      # Review Heading
      cmp_review_heading_cont = cmp_review.find('div', attrs={'class': 'cmp-review-heading'})   
      cmp_review_heading_title = cmp_review_heading_cont.find('span', attrs={'itemprop': 'name'}).getText()     
    
      # Review Overall Rating    
      cmp_review_heading_cont_span = cmp_review_heading_cont.find('span', attrs={'itemprop': 'reviewRating'})
      cmp_review_heading_rating = cmp_review_heading_cont_span.find('meta').attrs['content']    
      
      cmp_review_cc = cmp_review.find('div', attrs={'class': 'cmp-review-content-container'})
      cmp_review_desc = cmp_review_cc.find('div', attrs={'class': 'cmp-review-description'})
      review_text = cmp_review_desc.find('span', attrs={'class':'cmp-review-text'}).getText()
      
      
      df = df.append({'comp_name': company_name, 'comment': review_text, 'Review Rating': cmp_review_heading_rating,'Review Heading': cmp_review_heading_title},
      ignore_index=True)
    else:
     isFlag = False
  except Exception as e:
   print("Unexpected error:", sys.exc_info()[0])

# Save the result to CSV
df.to_csv('D:\indeed_reviews.csv', encoding='utf-8')

       
 

Thursday, June 22, 2017

Amazon Alexa - Intelligent Personal Assistant

As technologies are moving with a fast pace, things evolved from Web Applications to
mobile applications and now we want everything on voice command. Thanks to Amazon that comes with an amazing Artificial Intelligent IoT Device Alexa.
Alexa is an intelligent personal assistant developed by Amazon, made popular by the Amazon Echo and the Amazon Echo Dot devices developed by Amazon Lab126. It is capable of voice interaction, music playback, making to-do lists, setting alarms, streaming podcasts, playing audiobooks, and providing weather, traffic, and other real time information, such as news. Alexa can also control several smart devices using itself as a home automation system. Currently, interaction and communication with Alexa is only available in English and German. Now, LG Electronic devices are coming equipped with Alexa by which one can place voice commands to perform the task. Similarly Ford Amazon tie-up lets you control your car with your voice.
Build an interesting use case to recharge a Mobile Phone using Alexa Device. Follow the steps to build Skill and Lambda functions as part of the Alexa Application Development on Amazon AWS.

References -
  • Linkedin

Tuesday, April 11, 2017

Get Top, Bottom N Elements by Key

Given a set of (key-as-string, value-as-integer) pairs, say we want to create a
top N (where N > 0) list. Top N is a design pattern. For example, if key-as-string is a URL
and value-as-integer is the number of times that URL is visited, then you might
ask: what are the top 10 URLs for last week? This kind of question is common for
these types of key-value pairs. Finding a top 10 list is categorized as a filtering pattern
(i.e., you filter out data and find the top 10 list).

Approach 1 -
val inputRDD = sc.parallelize(Array(("A", 5), ("A",10), ("A", 6), ("B", 67), ("B", 78), ("B", 7)))
val resultsRDD = inputRDD.groupByKey.map{case (key, numbers) => key -> numbers.toList.sortBy(x => x).take(2)}
resultsRDD .collect()

Approach 2 -
val inputRDD  = sc.parallelize(Array(("A", 5), ("A",10), ("A", 6), ("B", 67), ("B", 78), ("B", 7)))
inputRDD .groupByKey.mapValues(number => number.toList.sortBy(x => x).take(2)).collect()

We can add negation at x => -x for bottom N elements.

Monday, March 13, 2017

SBT Project Creation

  1. Install SBT Window Plugin http://www.scala-sbt.org/download.html
  2. After installation set the Environmental path
    1. SBT_HOME
    2. Path
  3. Once Environmental setup is done create a directory with the project name say DEMOSBT
  4. Inside the DEMOSBT create a file name called build.sbt and folder project
  5. Inside the build.sbt write
name := "ProjectName"
version := "1.0"
scalaVersion := "2.10."4
Inside this file further we will be adding dependencies and assembly plugin details to build the UBER JAR.
  1. Inside the DEMOSBT\project folder create a file named plugins.sbt
  2. In plugins.sbt add below line to add Eclipse dependency
 addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.5.0")
Further similar to that we will add assembly plugin dependency as well for UBER JAR creation
  1. From the command window, move to DEMOSBT path and run sbt eclipse command that adds all the required dependency
  2. Import the project in Eclipse
  3. Down the line if you want to add any dependency then you have to modify build.sbt file. After that, again you have to run the sbt eclipse Refresh the eclipse project you will get the referenced libraries in the bundle.

Spark Installation on Windows

  • Choose a Spark pre built package for Hadoop i.e.  Pre-built for Hadoop 2.6 or later. Download and extract it to any drive i.e. D:\spark-2.1.0-bin-hadoop2.6
  • Set SPARK_HOME and add %SPARK_HOME%\bin in PATH in environment variables
  • Run following command on command line.
spark-shell
  • You’ll get an error for winutils.exe:
winutil
      Though we aren’t using Hadoop with Spark, but somewhere it checks for HADOOP_HOME variable in configuration. So to overcome this error, download winutils.exe and place it in any location (i.e. D:\winutils\bin\winutils.exe).
P.S. As per the Operating system version, this winutils.exe may vary. So in case, if it doesn't support to your OS, please find another one and use. You can refer this Problems running Hadoop on Windows link for winutils.exe.
  • Set HADOOP_HOME = D:\winutils in environment variable
  • Now, Re run the command "spark-shell", you’ll see the scala shell. For latest spark releases, if you get the permission error for /tmp/hive directory as given below:
The root scratch dir: /tmp/hive on HDFS should be writable. Current permissions are: rw-rw-rw-
You need to run following command :
D:\spark>D:\winutils\bin\winutils.exe chmod 777 D:\tmp\hive

Apache Kafka Installation on Windows

  1. Make sure JDK and JRE are installed and path is set in environment variable
  2. Download Apache Kafka from https://www.apache.org/dyn/closer.cgi?path=/kafka/0.10.2.0/kafka_2.10-0.10.2.0.tgz
  3. Unpack kafka_2.10-0.10.2.0.tgz
  4. Copy extracted kafka_2.10-0.8.2.1 directory to C:\
  5. Open C:\kafka_2.10-0.10.2.0\config\server.properties and update existing log.dir entry as mentioned below :
    dirs=c:/kafka_2.10-0.10.2.0/kafka-logs
  6. Open C:\kafka_2.10-0.10.2.0\config\zookeeper.properties and update existing dataDir entry as mentioned below :
    dataDir=c:/kafka_2.10-0.10.2.0/zookeeper-data(Note : Make sure you use forward slash as depicted for step-5 and step-6)
  7. Installation of Single Node Single Broker Kafka is done on Windows
  8. Starting Zookeeper and Kafka server
    Open a command prompt and start Zookeeper server using following command :
    cd c:\kafka_2.10-0.10.2.0
    bin\windows\zookeeper-server-start.bat .\config\zookeeper.properties
  9. Now open another command prompt and start Kafka server using following command
    .\bin\windows\kafka-server-start.bat .\config\server.properties
    I hope both Zookeeper and Kafka server are executed successfully for you.
  10. Now let's create Kafka Topic using following command
    cd C:\kafka_2.10-0.10.2.0
    bin\windows\kafka-topics.bat ––create ––zookeeper localhost:2181 ––replication-factor 1 ––partition 1 ––topic test
  11. List the topics using following command
    cd C:\kafka_2.10-0.10.2.0
    bin\windows\kafka-topics.bat ––list  ––zookeeper localhost:2181
  12. Send message using Kafka console producer
    cd C:\kafka_2.10-0.10.2.0
    bin\windows\kafka-console-producer.bat ––broker-list localhost:9092 ––topic test
  13. Consume the above message using Kafka console consumer
    cd C:\kafka_2.10-0.10.2.0
    bin\windows\kafka-console-consumer.bat ––zookeeper localhost:2181 ––topic test ––from-beginning

Apache Spark – Catalyst Optimizer

Optimizer is the one that automatically finds out the most efficient plan to execute data operations specified in the user’s program. In...