It’s been a while since my last post, and in this post I’m going to talk about a technology I’ve been using for almost a year now: Apache Spark.
Basically, Spark lets you do data processing in a distributed manner. The project is subdivided in modules:
Today, I’ll talk about MLlib which is, as previously mentioned, the Spark submodule dedicated to machine learning. This submodule is split in two: spark.mllib which is built on top of the old RDDs and spark.ml which is built on top of the DataFrame API. In this post, I’ll talk exclusively about spark.ml which aims to ease the process of creating machine learning pipelines.
If you want to follow this tutorial you will have to download spark which can be done here. Additionnally, you will need a few dependencies in order to build your project:
groupId | artifactId | version | scope |
---|---|---|---|
org.apache.spark | spark-core_2.10 | 1.5.2 | provided |
org.apache.spark | spark-sql_2.10 | 1.5.2 | compile |
org.apache.spark | spark-mllib_2.10 | 1.5.2 | compile |
com.databricks | spark-csv_2.10 | 1.2.0 | compile |
We’ll be using the Titanic dataset taken from a Kaggle competition. The goal is to predict if a passenger survived from a set of features such as the class the passenger was in, hers/his age or the fare the passenger paid to get on board.
You can find the code for this post on Github.
You can find a description of the features on Kaggle.
The dataset is split in two: train.csv
and test.csv
. As you’ve probably
already guessed, train.csv
will contain labeled data (the Survived
column
will be filled) and test.csv
will be unlabeled data. The goal is to predict
for each example/passenger in test.csv
whether or not she/he survived.
Since the data is in csv format, we’ll use spark-csv
which will parse our csv data and give us back DataFrames
.
To load the train.csv
and test.csv
file, I wrote the following function:
This function takes the paths to the train.csv
and test.csv
files as the two
first arguments and a sqlContext
which will have been initialized beforehand
like so:
Although not mandatory, we define the schema for the data as it is the same
for both files except for the Survived
column.
Then, we use spark-csv to load our
data.
Next, we’ll do a bit of feature engineering on this dataset.
If you have a closer look at the Name
column, you probably see that there is
some kind of title included in the name such as “Sir”, “Mr”, “Mrs”, etc.
I think it is a valuable piece of information and I think it can influence
whether someone survived or not, that’s why I extracted it in its own column.
My first intuition was to extract this title with a regex with the help of a UDF (for user-defined function):
Unfortunately, every passenger’s name doesn’t comply with this regex and this
resulted in some noise in the Title
column. As a result, I just looked for the
distinct titles produced by my UDF
and adapted it a bit:
This UDF tries to match on the previously defined pattern Pattern
. If the
regex matches we’ll try to find the title in our titles
map. Finally, if we
don’t find it, we’ll define the title based on the Sex
column: “Mr” if “male”,
“Mrs” otherwise.
I, then, wanted to represent the family size of each passenger with the help of
the Parch
column (which represents the number of parents/children aboard the
Titanic) and the SibSp
column (which represents the number of siblings/spouses
aboard):
The family size UDF just does the sum of the SibSp
and the Parch
columns
plus one.
You have two options when dealing with NA:
After noticing that NA values were present in the Age
, Fare
and Embarked
columns, I chose to replace them:
Age
columnFare
columnEmbarked
column which represents the city of SouthamptonIn order to do this, I calculated the average of the Age
column like so:
Same thing for the Fare
column. I, then, filled my dataset with those
averages:
Another option, which I won’t cover here, is to train a regression model on the
Age
column and use this model to predict the age for the examples where the
Age
is NA. Same thing goes for handling NA for the Fare
column.
However, spark-csv treats NA strings as empty strings instead of NAs (this is a
known bug described here).
This is why I coded a UDF which transforms empty strings in the Embarked
column to “S” for Southampton:
What’s very interesting about spark.ml compared to spark.mllib, aside from dealing with DataFrames instead of RDDs, is the fact that you can build and tune your own machine learning pipeline as we’ll see in a bit.
There are two main concepts in spark.ml (extracted from the guide):
Transformers
, which are algorithms which transfrom a DataFrame into
another. For example, a machine learning model is a Transformer
which
transforms DataFrames with features into DataFrames with predictions.Estimators
, which are algorithms which can be fit on a DataFrame to
produce a Transformer
. For example, a learning algorithm is an Estimator
which trains on a DataFrame to produce a machine learning model (which is a
Transformer
).A pipeline is an ordered combination of Transformers
and Estimators
.
In this post, we’ll be training a random forest and since spark.ml can’t handle categorical features or labels unless they are indexed, our first job will be to do just that.
Then, we’ll assemble all our feature columns into one vector column because every spark.ml machine learning algorithm expects that.
Once this is done, we can train our random forest as our data is in the expected format.
Finally, we’ll have to unindex our labels so they can be interpretable by us
and the Kaggle tester.
Fortunately, there are already built-in transformers to index categorical features, we just have to choose between two options:
VectorIndexer
is that it will index every feature which has
less than maxCategories
(which you can set with setMaxCategories
) no
matter whether it is indeed categorical or not. In our case, there are
categorical features with quite a few categories (Title
for example) and
quantitative features without too many different values (such as SibSp
or
Parch
). That’s why I don’t think this is the way to go.StringIndexer
which will index all your categorical features in one
step (there is a PR going on to do just that
here though).We wil proceed with option 2 in order to have a bit more control over which features is getting indexed:
We also index our label which corresponds to the Survived
column:
Now that our indexing is done, we just need to assemble all our feature columns into one single column containing a vector regrouping all our features. To do that, we’ll use the built-in VectorAssembler transformer:
We’ll now have two columns:
SurvivedIndexed
containing our indexed labelFeatures
containing a vector of our different features (quantitative and
indexed categorical)
Now that our data is in the proper format expected by spark.ml we can use a classifier. Here I’ll use a RandomForestClassifier but since our data is properly formatted we can replace it by any spark.ml classifier we want:
IndexToString
is the reverse operation of StringIndexer
and will convert
back our indexes to the original labels so they can be interpretable. Indeed,
as indicated in the documentation for random forests,
the call on the transform
method of the model produced by the
RandomForestClassifier
will produce a prediction
column which will contain
indexed labels which we need unindexed.
Since all our different steps have been implemented, we can create our pipeline:
We first apply each StringIndexer
for every one of our categorical features
and our label, we then assemble every feature into one column. Then, we train
our random forest and we finally convert back the indexed labels predicted to
the original ones.
In order to select the best model, you’ll often find yourself performing a grid search over a set of parameters, for each combination of parameters do cross validation and keep the best model according to some performance indicator.
This is a bit tedious and spark.ml aims to simplify that with an easy-to-use API.
A quick reminder if you don’t know what
cross validation
is: you chose a number k
of folds, for example 3, your dataset will be split
into three parts, from those 3 parts, 3 different pairs of training and test
data will be generated (2/3 of the data for the training and 1/3 for the test).
Then the model is evaluated on the average of the chosen performance indicator
over the three pairs.
First, we’re going to want to create a grid of parameters:
The different parameters for spark.ml’s random forests can be found in the scaladoc.
Next, we need to define an Evaluator
which, as its name implies, will evaluate
our model according to some metric. There are three built-in evaluator: one for
regression, one for binary classification and another one multiclass
classification. In our case, we’re only interested in the
BinaryClassificationEvaluator.
The default metric used for binary classification is the area under
the ROC curve.
A BinaryClassificationEvaluator
can be created in the following way:
However, another metric is available for binary classification: the area under the precision-recall curve which can be used with:
We also need an Estimator
to be trained, in our case, it will be our whole
pipeline.
Finally, after chosing k=10
, the number of folds the data will be split into
during cross validation, we can create a CrossValidator
object like so:
Since our CrossValidator
is an Estimator
, we can obtain the best model for
our data by calling the fit
method on it:
We can now make predictions on the test.csv
file given by Kaggle:
WARNING: You have to be careful when running cross validation, especially on
bigger datasets, as it will train k x p
models where k
represents the number
of folds used for cross validation and p
is the product of the number of
values for each param in your grid.
If we go back to our previous example with k=10
and the following parameter
grid:
We get p = 3 x 3 x 2 = 18
, so our cross validation will train
k x p = 10 x 18 = 180
different models.
Now that we have our predictions, we just need to transform our data in order to fit the expected format by Kaggle and save it to a csv file:
There is still one more step to be performed: the output file will unfortunately
be in a directory in the part-[0-9]{5}
hadoop format. As a result, I wrote a
little script to launch the Spark job and rename the output file so it is ready
to be submitted to Kaggle:
I hope this was an interesting introduction to spark.ml and that I could convey the simplicity and expressiveness of the API.
For information, I managed to score 0.80383 on the contest.