Spark 2.0 brings a significant changes to abstractions and API’s of spark platform. With performance boost, this version has made some of non backward compatible changes to the framework. To keep up to date with the latest updates, one need to migrate their spark 1.x code base to 2.x. In last few weeks, I was involved in migrating one of fairly large code base and found it quite involving process. In this series of posts, I will be documenting my experience of migration so it may help all the ones out there who are planning to do the same.

This is the third post in this series.In this post we discuss about migrating dataframe based API’s to dataset based once. You can access all the posts here.

TL;DR You can access all the code on github.

DataFrame Abstraction in Spark 1.x

In 1.x series of spark, dataframe was a structured abstraction over native RDD abstraction. Dataframe was built to support the structured transformation of data, using dataframe dsl or spark sql query language. Where as rdd abstraction was there to provide the functional API’s for manipulating the data.

So whenever we wanted to use functional API’s on dataframe, we would be converting dataframe into a RDD and then manipulated as RDD abstraction. This kind of conversion made it’s easy to move between dataframe and rdd abstractions.

DataFrame to RDD Example

The below code shows, how to use functional map API on dataframe abstraction

val loadedDF = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load("../test_data/sales.csv")

val amountRDD = loadedDF.map(row  row.getDouble(3))

In above code, whenever we call map spark implicitly converts to an RDD. So this is the way we operated on the dataframe in spark 1.x.

You can access complete code on github.

DataFrame abstraction in 2.x

From spark 2.x, the dataframe abstraction has changed significantly. In 2.x, dataframe is alias of Dataset[Row].Dataset is combination of both dataframe and RDD like API’s. So not only dataset supports structured querying using dsl and sql, it also supports the functional API’s which are supported in RDD.

So whenever we call map in 2.x, we no more get a RDD. Instead we get dataset. This change in the conversion will break your code if you are using the RDD based code showed earlier.

Porting DataFrame to RDD code in 2.x

To port your existing code,you need to add extra, .rdd before calling map method as shown in the code.

val amountRDD = loadedDF.rdd.map(row  row.getDouble(3))
println(amountRDD.collect.toList)

RDD to Dataset Abstraction

The above code makes it easy to port but doesn’t provide good performance. Also converting rdd back to dataframe is a tedious work. So rather than using rdd functional API’s, you can use dataset functional API’s. Only constraint is that the return type of map should have a encoder. By default all primitive types and case classes are supported.

import sparkSession.implicits._
val amountDataSet = loadedDF.map(row  row.getDouble(3))

By importing sparkSession.implicits._ in to the scope, we are importing all default encoders. As we are returning a double value, there is a built in encoder for the same.

This code is more performant than the RDD code. So use dataset based functional API wherever you were using RDD before. Only fall back to RDD API, whenever dataset API doesn’t support that API.

You can access complete code on github.

Conclusion

In this post we learnt how to port RDD based functional API’s to more peformant dataset alternatives.