Apache Spark comes with lot of built in generic operators to do data processing. But many a times, when we are building real world applications, we need domain specific operators to solve problem in hand. So in these cases, we like to extend the Spark API to add our own custom operators.
We can extend spark API in two ways. One of the way is to add custom operator for existing RDD’s and second is to one create our own RDD.
In this post, we are going to discuss both the methods.
tl;dr Access complete code on github.
Let’ say we have sales data from an online store. The data is in csv format. It contains transactionId, customerId, itemId and itemValue. This model is represented as SalesRecord.
So whenever we get sales data, we convert the raw data to RDD[SalesRecord].
Let’s say we want to find out total amount of sales, then in Spark we can write
though it’s concise, it’s not super readable. It will be nice to have
In the above code, the totalSales feels like built in spark operator.Of course spark don’t know anything about our data or our data model. Then how we can add our own custom operator on RDD?
Adding custom operators to RDD
The following are the steps to add custom operator’s to RDD.
Step 1 : Define Utility class to hold custom operators
The following code defines an utility class, CustomFunctions , which holds all the custom operators. We take specific RDD,i.e RDD[SalesRecord] so that these operators only available on sales record RDD.
Step 2 : Implicit conversion to add operators on RDD
The following code defines an implicit function, addCustomFunctions which will add all the custom functions defined in CustomFunctions to the RDD[SalesRecord]
Step 3: Use custom functions, using implicit import
The following code has access to custom operator, totalSales using CustomFunctions._ import.
With the above steps, you defined a domain specific operator on RDD.
Creating custom RDD
In the earlier example, we implemented an action which result in single value. But what about the situation where we want to represent lazily evaluated actions?. For example, let’s say we want to give discount to each sales in the RDD. These discounts are lazy in nature. So we need a RDD which can represent the laziness. In following steps we are going to create a RDD called DiscountRDD which holds the discount calculation.
Step 1: Create DiscountRDD by extending RDD
In the above code, we created a RDD called DiscountRDD. It is a RDD derived by applying discount on sales RDD. When we extend RDD, we have to override two methods
This method is the one which computes value for each partition of RDD. In our code, we take input sales record and output it by applying discount as specified by discountPercentage.
getPartitions method allows developer to specify the new partitions for the RDD. As we don’t change the partitions in our example, we can just reuse the partitions of parent RDD.
Step 2: Add a custom operator named discount
Using similar trick discussed earlier, we can add custom operator called discount which creates DiscountRDD.
Step 3 : Use discount, using implicit import
So now you know how you can extends spark API for your own domain specific use cases.