Spark 2.0 brings a significant changes to abstractions and API’s of spark platform. With performance boost, this version has made some of non backward compatible changes to the framework. To keep up to date with the latest updates, one need to migrate their spark 1.x code base to 2.x. In last few weeks, I was involved in migrating one of fairly large code base and found it quite involving process. In this series of posts, I will be documenting my experience of migration so it may help all the ones out there who are planning to do the same.

This is the eighth post in this series.In this post we will discuss about catalog support in spark sql. You can access all the posts here.

TL;DR You can access all the code on github.

Catalog API

In spark 1.x, spark heavily depended on hive for all metastore related operations. Even though sqlContext supported few of the DDL operations, most of them were very basic and not complete. So spark documentation often recommended using HiveContext over SQLContext. Also whenever user uses HiveContext, spark support for interacting with hive metastore was limited. So most of the metastore operation’s often done as embedded hive queries.

Spark 2.x changes all of this. It has exposed a full fledged user facing catalog API which works for both spark SQL and hive. Not only it supports the spark 1.x operations, it has added many new ones to improve the interaction with metastore.

In below sections, we will be discussing about porting earlier metastore operations to new catalog API.

Creating Table

Before we do any DDL operations, we need to create a table. For our example, we will use temporary tables.

Spark 1.x

val loadedDf = sqlContext.read.format("com.databricks.spark.csv").
  option("header", "true").load("../test_data/sales.csv")

loadedDf.registerTempTable("sales")

We use registerTempTable API for creating table in in-memory catalog.

Spark 2.x

val loadedDf = sparkSession.read.format("csv").option("header", "true").load("../test_data/sales.csv")

loadedDf.createOrReplaceTempView("sales")

In 2.x, registerTempTable API is deprecated. We should use createOrReplaceTempView for the same.

List Tables

Once we have table registered, first catalog operation is listing tables.

Spark 1.x

sqlContext.tables.show()

In spark 1.x, catalog API’s were added to context directly. The below is the output

+---------+-----------+
|tableName|isTemporary|
+---------+-----------+
|    sales|       true|
+---------+-----------+

In output, it specifies the name of the table and is it temporary or not. When we run same operation on hive metastore, isTemporary will be false.

Spark 2.x

In spark 2.x, there is separate API called catalog on spark session. It exposes all needed API’s.

sparkSession.catalog.listTables.show()

The output looks like below

+-----+--------+-----------+---------+-----------+
| name|database|description|tableType|isTemporary|
+-----+--------+-----------+---------+-----------+
|sales|    null|       null|TEMPORARY|       true|
+-----+--------+-----------+---------+-----------+

From output it is apparent that output of new catalog API is more richer than old one. Also from spark 2.x, it has added database information as part of the catalog which was missing in earlier API’s.

List Table Names

In spark 1.x, there is a API for listing just the name of the tables. The code looks below

sqlContext.tableNames()

In spark 2.x, there is no separate API for getting database names. As listTables, returns a Dataset we can use normal spark sql API’s for getting the name.

sparkSession.catalog.listTables.select("name").collect

Caching

As part of the catalog API, we can check is given table is cached or not.

Spark 1.x

println(sqlContext.isCached("sales"))

Spark 2.x

println(sparkSession.catalog.isCached("sales"))

External Tables

Till now, we worked with tables which we creating using dataframes. Let’s say we need to create table directly from a file without going through data source API. These often known as external tables.

Spark 1.x

In spark 1.x, SQLContext didn’t support creating external tables. So we need to use hivecontext for do that.

val hiveContext = new HiveContext(sparkContext)
 hiveContext.setConf("hive.metastore.warehouse.dir", "/tmp")
 hiveContext.createExternalTable("sales_external", "com.databricks.spark.csv", Map(
   "path" -> "../test_data/sales.csv",
   "header" -> "true"))
 hiveContext.table("sales_external").show()

In above code, first we create HiveContext. Then we need to set the warehouse directory so hive context knows where to keep the data. Then we use createExternalTable API to load the data to sales_external table.

Spark 2.x

sparkSession.catalog.createExternalTable("sales_external", "com.databricks.spark.csv", Map(
 "path" -> "../test_data/sales.csv",
 "header" -> "true"))
sparkSession.table("sales_external").show()

In spark 2.x, creating external table is part of catalog API itself. We don’t need to enable hive for this functionality.

Additional API’s

The above examples showed the porting of spark 1.x code to spark 2.x. But there are additional API’s in spark 2.x catalog which are useful in day to day development. Below sections discusses few of them.

List Functions

The below code list all the functions, built in and user defined. It helps us to know what are the Udfs, Udaf’s available in current session.

sparkSession.catalog.listFunctions.show()

The sample output looks as below

+--------------------+--------+-----------+--------------------+-----------+
|                name|database|description|           className|isTemporary|
+--------------------+--------+-----------+--------------------+-----------+
|                   !|    null|       null|org.apache.spark....|       true|
|                   %|    null|       null|org.apache.spark....|       true|
|                   &|    null|       null|org.apache.spark....|       true|
|                acos|    null|       null|org.apache.spark....|       true|
|          add_months|    null|       null|org.apache.spark....|       true|
|                 and|    null|       null|org.apache.spark....|       true|
|approx_count_dist...|    null|       null|org.apache.spark....|       true|
+--------------------+--------+-----------+--------------------+-----------+

List Columns

We can also list the columns of a table.

sparkSession.catalog.listColumns("sales").show()

The below is the output for our sales table.

+-------------+-----------+--------+--------+-----------+--------+
|         name|description|dataType|nullable|isPartition|isBucket|
+-------------+-----------+--------+--------+-----------+--------+
|transactionId|       null|  string|    true|      false|   false|
|   customerId|       null|  string|    true|      false|   false|
|       itemId|       null|  string|    true|      false|   false|
|   amountPaid|       null|  string|    true|      false|   false|
+-------------+-----------+--------+--------+-----------+--------+

Complete code

You can access complete code for spark 1.x here.

You can access complete code for spark 2.x here.

Conclusion

In this blog we have discussed about improvements in catalog API. Using new catalog API, you can get information of tables much easier than before.