A few months back I bought the excellent Type Astronaut’s Guide to Shapeless which is, as its name implies, a comprehensive guide to Shapeless: a Scala library making it easier to do generic programming.
A big part of the book is a practical tutorial on building a JSON encoder. As I read through this example, I was eager to find a similar usecase where I could apply all this Shapeless knowledge.
By the end of the book, I came up with the idea of deriving instances of Spark’s StrucType.
StructType
is the way Spark internally represents DataFrame schemas:
case class Foo(a: Int, b: String)
val df = spark.createDataFrame(Seq(Foo(1, "a"), Foo(2, "b")))
// df: DataFrame = [a: int, b: string]
df.schema
// res0: StructType = StructType(StructField(a,IntegerType,false), StructField(b,StringType,true))
The problem is interacting with them.
When reading a DataFrame/Dataset from an external data source the schema, unless specified, has to be inferred.
This is especially the case when reading:
As shown in the code linked above, inferring the schema for your DataFrame translates into looking at every record of all the files we need to read and coming up with a schema that can satisfy every one of these rows (basically the disjunction of all the possible schemas).
As the benchmarks at the end of this post will show, this is a time consuming task which can be avoided by specifying the schema in advance:
case class Foo(a: Int, b: String)
val schema = StructType(
StructField("a", IntegerType) ::
StructField("b", StringType) :: Nil
)
val df = spark
.read
.schema(schema)
.json("/path/to/*.json")
.as[Foo]
However, writing schemas again and again in order to avoid having Spark infer them is a lot of tedious boilerplate, especially when dealing with complex domain-specific data models.
That’s where struct-type-encoder comes into play.
struct-type-encoder
provides a boilerplate-free way of specifying a StructType
when reading a
DataFrame:
import ste._
val df = spark
.read
.schema(StructTypeEncoder[Foo].encode)
.json("/path/to/*.json")
.as[Foo]
In this section, I’ll detail the few lines of code behind the project.
The first step is to define a DataTypeEncoder
type class which will be responsible of the mapping
between Scala types and Spark DataTypes.
sealed trait DataTypeEncoder[A] {
def encode: DataType
}
object DataTypeEncoder {
def apply[A](implicit enc: DataTypeEncoder[A]): DataTypeEncoder[A] = enc
def pure[A](dt: DataType): DataTypeEncoder[A] =
new DataTypeEncoder[A] { def encode: DataType = dt }
}
We can now define a couple of primitive instances, e.g. for Int
and String
:
implicit val intEncoder: DataTypeEncoder[Int] = pure(IntegerType)
implicit val stringEncoder: DataTypeEncoder[String] = pure(StringType)
and a couple of combinator instances:
implicit def encodeTraversableOnce[A0, C[_]](
implicit
enc: DataTypeEncoder[A0],
is: IsTraversableOnce[C[A0]] { type A = A0 }
): DataTypeEncoder[C[A0]] =
pure(ArrayType(enc.encode))
implicit def mapEncoder[K, V](
implicit
kEnc: DataTypeEncoder[K],
vEnc: DataTypeEncoder[V]
): DataTypeEncoder[Map[K, V]] =
pure(MapType(kEnc.encode, vEnc.encode))
Credit to circe for the collection encoder.
The second step is to derive StructType
s from case classes leveraging Shapeless’ HList
s:
sealed trait StructTypeEncoder[A] extends DataTypeEncoder[A] {
def encode: StructType
}
object StructTypeEncoder {
def apply[A](implicit enc: StructTypeEncoder[A]): StructTypeEncoder[A] = enc
def pure[A](st: StructType): StructTypeEncoder[A] =
new StructTypeEncoder[A] { def encode: StructType = st }
}
We can now define our instances for HList
:
implicit val hnilEncoder: StructTypeEncoder[HNil] = pure(StructType(Nil))
implicit def hconsEncoder[K <: Symbol, H, T <: HList](
implicit
witness: Witness.Aux[K],
hEncoder: Lazy[DataTypeEncoder[H]],
tEncoder: StructTypeEncoder[T]
): StructTypeEncoder[FieldType[K, H] :: T] = {
val fieldName = witness.value.name
pure {
val head = hEncoder.value.encode
val tail = tEncoder.encode
StructType(StructField(fieldName, head) +: tail.fields)
}
}
implicit def genericEncoder[A, H <: HList](
implicit
generic: LabelledGeneric.Aux[A, H],
hEncoder: Lazy[StructTypeEncoder[H]]
): StructTypeEncoder[A] =
pure(hEncoder.value.encode)
And that’s it! We can now map case classes to StructType
s.
I wanted to measure how much time was saved specifying the schema using struct-type-encoder compared to letting Spark infer it.
In order to do so, I wrote a couple of benchmarks using JMH: one for JSON and one for CSV. Each of these
benchmarks writes a thousand file containing a hundred lines each and measure the time spent reading
the Dataset
.
The following table sums up the results I gathered:
derived | inferred | |
---|---|---|
CSV | 5.936 ± 0.035 s | 6.494 ± 0.209 s |
JSON | 5.092 ± 0.048 s | 6.019 ± 0.049 s |
As we can see, bypassing the schema inference done by Spark using struct-type-encoder takes 16.7% less time when reading JSON and 8.98% less when reading CSV.
The code for the benchmarks can be found here.
They can be run using the dedicated sbt plugin sbt-jmh with the jmh:run .*Benchmark
command.
If you’d like to use struct-type-encoder it’s available on maven-central as
"com.github.benfradet" %% "struct-type-encoder" % "0.1.0"
.
The repo can be found on github: https://github.com/BenFradet/struct-type-encoder/.
Before reading Dave Gurnell’s book, I had heard about Shapeless but never really felt the need to dig deeper. This small project really made me understand the value that Shapeless brings to the table.
About half-way through the project, I discovered frameless, a library which aims to bring back type safety to Spark SQL. It turns out that they use a similar mechanism in TypedEncoder. The goal now is to expose the functionality proposed by struct-type-encoder in frameless.
Additionally, for 0.2.0, I want to start experimenting with coproducts and see what’s possible on Spark’s side of things.