Deriving Spark Dataframe schemas with Shapeless

June 14, 2017 - Spark, Scala, tutorial

A few months back I bought the excellent Type Astronaut’s Guide to Shapeless which is, as its name implies, a comprehensive guide to Shapeless: a Scala library making it easier to do generic programming.

A big part of the book is a practical tutorial on building a JSON encoder. As I read through this example, I was eager to find a similar usecase where I could apply all this Shapeless knowledge.

By the end of the book, I came up with the idea of deriving instances of Spark’s StrucType.

What are StructTypes?

StructType is the way Spark internally represents DataFrame schemas:

case class Foo(a: Int, b: String)

val df = spark.createDataFrame(Seq(Foo(1, "a"), Foo(2, "b")))
// df: DataFrame = [a: int, b: string]

df.schema
// res0: StructType = StructType(StructField(a,IntegerType,false), StructField(b,StringType,true))

The problem is interacting with them.

The problem

When reading a DataFrame/Dataset from an external data source the schema, unless specified, has to be inferred.

This is especially the case when reading:

As shown in the code linked above, inferring the schema for your DataFrame translates into looking at every record of all the files we need to read and coming up with a schema that can satisfy every one of these rows (basically the disjunction of all the possible schemas).

As the benchmarks at the end of this post will show, this is a time consuming task which can be avoided by specifying the schema in advance:

case class Foo(a: Int, b: String)

val schema = StructType(
  StructField("a", IntegerType) ::
  StructField("b", StringType) :: Nil
)

val df = spark
  .read
  .schema(schema)
  .json("/path/to/*.json")
  .as[Foo]

However, writing schemas again and again in order to avoid having Spark infer them is a lot of tedious boilerplate, especially when dealing with complex domain-specific data models.

That’s where struct-type-encoder comes into play.

The solution

struct-type-encoder provides a boilerplate-free way of specifying a StructType when reading a DataFrame:

import ste._
val df = spark
  .read
  .schema(StructTypeEncoder[Foo].encode)
  .json("/path/to/*.json")
  .as[Foo]

Under the hood

In this section, I’ll detail the few lines of code behind the project.

The first step is to define a DataTypeEncoder type class which will be responsible of the mapping between Scala types and Spark DataTypes.

sealed trait DataTypeEncoder[A] {
  def encode: DataType
}

object DataTypeEncoder {
  def apply[A](implicit enc: DataTypeEncoder[A]): DataTypeEncoder[A] = enc

  def pure[A](dt: DataType): DataTypeEncoder[A] =
    new DataTypeEncoder[A] { def encode: DataType = dt }
}

We can now define a couple of primitive instances, e.g. for Int and String:

implicit val intEncoder: DataTypeEncoder[Int]       = pure(IntegerType)
implicit val stringEncoder: DataTypeEncoder[String] = pure(StringType)

and a couple of combinator instances:

implicit def encodeTraversableOnce[A0, C[_]](
  implicit
  enc: DataTypeEncoder[A0],
  is: IsTraversableOnce[C[A0]] { type A = A0 }
): DataTypeEncoder[C[A0]] =
  pure(ArrayType(enc.encode))

implicit def mapEncoder[K, V](
  implicit
  kEnc: DataTypeEncoder[K],
  vEnc: DataTypeEncoder[V]
): DataTypeEncoder[Map[K, V]] =
  pure(MapType(kEnc.encode, vEnc.encode))

Credit to circe for the collection encoder.

The second step is to derive StructTypes from case classes leveraging Shapeless’ HLists:

sealed trait StructTypeEncoder[A] extends DataTypeEncoder[A] {
  def encode: StructType
}

object StructTypeEncoder {
  def apply[A](implicit enc: StructTypeEncoder[A]): StructTypeEncoder[A] = enc

  def pure[A](st: StructType): StructTypeEncoder[A] =
    new StructTypeEncoder[A] { def encode: StructType = st }
}

We can now define our instances for HList:

implicit val hnilEncoder: StructTypeEncoder[HNil] = pure(StructType(Nil))

implicit def hconsEncoder[K <: Symbol, H, T <: HList](
  implicit
  witness: Witness.Aux[K],
  hEncoder: Lazy[DataTypeEncoder[H]],
  tEncoder: StructTypeEncoder[T]
): StructTypeEncoder[FieldType[K, H] :: T] = {
  val fieldName = witness.value.name
  pure {
    val head = hEncoder.value.encode
    val tail = tEncoder.encode
    StructType(StructField(fieldName, head) +: tail.fields)
  }
}

implicit def genericEncoder[A, H <: HList](
  implicit
  generic: LabelledGeneric.Aux[A, H],
  hEncoder: Lazy[StructTypeEncoder[H]]
): StructTypeEncoder[A] =
  pure(hEncoder.value.encode)

And that’s it! We can now map case classes to StructTypes.

Benchmarks

I wanted to measure how much time was saved specifying the schema using struct-type-encoder compared to letting Spark infer it.

In order to do so, I wrote a couple of benchmarks using JMH: one for JSON and one for CSV. Each of these benchmarks writes a thousand file containing a hundred lines each and measure the time spent reading the Dataset.

The following table sums up the results I gathered:

  derived inferred
CSV 5.936 ± 0.035 s 6.494 ± 0.209 s
JSON 5.092 ± 0.048 s 6.019 ± 0.049 s

As we can see, bypassing the schema inference done by Spark using struct-type-encoder takes 16.7% less time when reading JSON and 8.98% less when reading CSV.

The code for the benchmarks can be found here.

They can be run using the dedicated sbt plugin sbt-jmh with the jmh:run .*Benchmark command.

Get the code

If you’d like to use struct-type-encoder it’s available on maven-central as "com.github.benfradet" %% "struct-type-encoder" % "0.1.0".

The repo can be found on github: https://github.com/BenFradet/struct-type-encoder/.

Conclusion

Before reading Dave Gurnell’s book, I had heard about Shapeless but never really felt the need to dig deeper. This small project really made me understand the value that Shapeless brings to the table.

About half-way through the project, I discovered frameless, a library which aims to bring back type safety to Spark SQL. It turns out that they use a similar mechanism in TypedEncoder. The goal now is to expose the functionality proposed by struct-type-encoder in frameless.

Additionally, for 0.2.0, I want to start experimenting with coproducts and see what’s possible on Spark’s side of things.