Spark 2.0 brings a significant changes to abstractions and API’s of spark platform. With performance boost, this version has made some of non backward compatible changes to the framework. To keep up to date with the latest updates, one need to migrate their spark 1.x code base to 2.x. In last few weeks, I was involved in migrating one of fairly large code base and found it quite involving process. In this series of posts, I will be documenting my experience of migration so it may help all the ones out there who are planning to do the same.

This is the forth post in this series.In this post we discuss about cross joins. You can access all the posts here.

TL;DR You can access all the code on github.

Joins in Spark SQL

Joins are one of the costliest operations in spark or big data in general. So whenever we program in spark we try to avoid joins or restrict the joins on limited data.There are various optimisations in spark , right from choosing right type of joins and using broadcast joins to improve the performance.

Cross Joins

Cross Join or cartesian product is one kind of join where each row of one dataset is joined with other. So if we have a dataset of size m and if we join with other dataset with of size n , we will getting a dataset with m*n number of rows.

Cross joins are one of the most time consuming joins and often should be avoided. But sometimes, we may accidentally do them without intending to do so. But we recognise performance issue only when they run on large data. So having ability to identify them at earliest will save lot of hassle.

Cross Joins in 1.x

The below code is an example of cross join in 1.x. In this example, we are doing a self join without any condition.

val loadedDf = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("../test_data/sales.csv")

 //cross join the data
 val crossJoinDf = loadedDf.join(loadedDf)
// count the joined df
 println(crossJoinDf.explain())
 println(crossJoinDf.count)

The plan looks as below

CartesianProduct
:- Scan CsvRelation(<function0>,Some(../test_data/sales.csv),true,
,,",null,#,PERMISSIVE,COMMONS,false,
false,false,null,
false,null,,null,100000)[transactionId#0,
customerId#1,itemId#2,amountPaid#3]
+- Scan CsvRelation(<function0>,Some(../test_data/sales.csv),
true,,,",null,#,PERMISSIVE,COMMONS,false,
false,false,null,false,null,,null,100000)
[transactionId#9,customerId#10,itemId#11,amountPaid#12]

When you observe the plan, it indicates the cartesian product.

You can access complete code on github.

Cross Join in 2.x

In 2.x, spark has added an expilict check for cartersian product. By default all the joins reject the cross product. So if you run the same code in 2.x, you will get below error.

org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between logical plans

So this makes sures that by accident we don’t introduce any cartesian products in our code.

Explicit Cross Join in 2.x

So if you really want to have cross join, you need to be explicit in the code

val crossJoinDf = loadedDf.crossJoin(loadedDf)

By explicitly specifying the cross join, spark will allow user to do cross join. This helps programmer to avoid introducing cross join accidentally.

You can access complete code on github.

Conclusion

Spark 2.x makes sure that we don’t introduce cross join accidentally. This smartness built into analyser helps to improve the performance of many workloads.