Spark 2.0 is the next major release of Apache Spark. This release brings major changes to abstractions, API’s and libraries of the platform. This release sets the tone for next year’s direction of the framework. So understanding these few features is critical to understand for the ones who want to make use all the advances in this new release. So in this series of blog posts, I will be discussing about different improvements landing in Spark 2.0.

This is the third blog in series, where I will be discussing about how to port your RDD based code to Dataset API. You can access all the posts in the series here.

TL;DR All code examples are available on github.

RDD to Dataset

Dataset API combines best of RDD and DataFrame API’s in one API. Many API’s in Dataset mimic the RDD API though they differ a lot in the implementation. So most of RDD code can be easily ported to Dataset API. In this post, I will be sharing few code snippets to show how a given code in RDD API can be written in Dataset API.

##1. Loading Text Data

RDD

val rdd = sparkContext.textFile("src/main/resources/data.txt")

Dataset

val ds = sparkSession.read.text("src/main/resources/data.txt")

2. Calculating count

###RDD

rdd.count()

Dataset

ds.count()

3. WordCount Example

RDD

 val wordsRDD = rdd.flatMap(value => value.split("\\s+"))
 val wordsPair = wordsRDD.map(word => (word,1))
 val wordCount = wordsPair.reduceByKey(_+_)

Dataset

import sparkSession.implicits._
val wordsDs = ds.flatMap(value => value.split("\\s+"))
val wordsPairDs = wordsDs.groupByKey(value => value)
val wordCountDs = wordsPairDs.count()

4. Caching

RDD

rdd.cache()

Dataset

ds.cache()

5. Filter

RDD

val filteredRDD = wordsRDD.filter(value => value =="hello")

Dataset

val filteredDS = wordsDs.filter(value => value =="hello")

6. Map Partitions

RDD

val mapPartitionsRDD = rdd.mapPartitions(iterator => List(iterator.count(value => true)).iterator)

Dataset

val mapPartitionsDs = ds.mapPartitions(iterator => List(iterator.count(value => true)).iterator)

7. reduceByKey

RDD

val reduceCountByRDD = wordsPair.reduceByKey(_+_)

Dataset

val reduceCountByDs = wordsPairDs.mapGroups((key,values) =>(key,values.length))

7. Conversions

RDD

 val dsToRDD = ds.rdd

Dataset

Converting a RDD to dataframe is little bit work as we need to specify the schema. Here we are showing how to convert RDD[String] to DataFrame[String].

val rddStringToRowRDD = rdd.map(value => Row(value))
val dfschema = StructType(Array(StructField("value",StringType)))
val rddToDF = sparkSession.createDataFrame(rddStringToRowRDD,dfschema)
val rDDToDataSet = rddToDF.as[String]

##8. Double Based Operations

RDD

val doubleRDD = sparkContext.makeRDD(List(1.0,5.0,8.9,9.0))
val rddSum =doubleRDD.sum()
val rddMean = doubleRDD.mean()

Dataset

val rowRDD = doubleRDD.map(value => Row.fromSeq(List(value)))
val schema = StructType(Array(StructField("value",DoubleType)))
val doubleDS = sparkSession.createDataFrame(rowRDD,schema)

import org.apache.spark.sql.functions._
doubleDS.agg(sum("value"))
doubleDS.agg(mean("value"))

##9. Reduce API

RDD

val rddReduce = doubleRDD.reduce((a,b) => a +b)

Dataset

val dsReduce = doubleDS.reduce((row1,row2) =>Row(row1.getDouble(0) + row2.getDouble(0)))

You can access complete code here.

The above code samples show how to move your RDD based code base to new Dataset API. Though it doesn’t cover complete RDD API, it should give you fair idea about how RDD and Dataframe API’s are related.