This post will detail how I built my entry to the Kaggle San Francisco crime classification competition using Apache Spark and the new ML library.
We’ll be exploring the San Francisco crime dataset which contains crimes which took place between 2003 and 2015 as detailed on the Kaggle competition page.
You can find the code for this post on Github.
The dataset given by Kaggle is split into two archives
The training and test datasets contain a few features:
Dates: timestamp of the crime incident in the PST timezone
Descript: detailed description of the crime incident (unfortunately this is only available in the training dataset so it’ll be pretty much useless)
DayOfWeek: the day of the week of the crime incident
PdDistrict: name of the police department district which handled the crime incident
Resolution: how the crime incident was resolved (it’s also only in the training dataset)
Address: the approximate street address of the incident
X: longitude of the incident
Y: latitude of the incident
The goal of the competition is to predict the
Category variable which
corresponds to the type of offense committed, for example, here are the ten most
Since the data is in csv format, we’ll use spark-csv which will parse our csv and give us back
This function takes the paths to the uncompressed training and test files and
sqlContext which will have been initialized beforehand like so:
I should point out that you can let Spark infer the schema. However, I tend to always specify the type of every field to make sure every value of every variable is of the expected type.
We can now load our data:
As a side note, parsing csv will be part of Apache Spark 2.0 so we won’t have
to rely on another jar anymore.
Basically, we’re left with six features:
Addresswhich is unusable as is (23 228 distinct addresses)
Ywhich is basically one single feature: the coordinates of the incident
Let’s see if there are other features which could help our classification.
Along the lines of
DayOfWeek I wanted to see if other time-related features
had any impact on the type of offense being committed.
Intuitively, given the top 10 categories, we would think that some types of crime take place during night hours most of the time such as vehicle thefts or vandalism.
Another trend I wanted to investigate was whether there were more crimes being
committed during certain years (and months). For example, here is the
LARCENY/THEFT crimes according to the year the incident
We clearly see an upward trend in the last few years.
This can be obtained with:
ASSAULTs occurred the most during the first few years of the
time span covered by this dataset:
To sum things up, we need three new features: the hour of day, the month and the year. Fortunately, Spark comes bundled with a lot of sql-like utility functions which were presented in a Databricks blogpost a few months back. This will greatly simplify our work:
Next up, I wrote a UDF to check if the incident took place during the weekend:
Then, I wanted to make the
Address variable usable. If you have a look at a
few addresses in the dataset, you’ll notice that they come in two forms:
Consequently, I introduced two features from this column:
AddressTypewhich indicates whether the incident took place at an intersection or on a particular street
Streetwhere I attempted to parse the
Addressvariable to a single street name, this reduced the cardinality of the original feature by 10x
Street variable will only contain the first address
Address is an intersection. So, is is possible that two
addresses containing the same street represented by intersections won’t result
in the same street.
For example, given two
A STREET / B STREET and
B STREET / C STREET the resulting
Street will be
A STREET and
Along the same lines of the
HourOfDay feature, I reasoned that it would be
interesting to see if an incident occurred during the day or the night.
To solve this issue, I used the sunrise-sunset.org API with this script. It basically makes a request to the API for each day present in the dataset to retrieve the time of sunrise and sunset, parses the json-formatted result thanks to circe (which is a great json library for Scala by the way) and writes it to a file. There is an important thing to note about this script: a request is made every five seconds in order not to overload the API.
The resulting dataset can be found in the repo.
Once we have all our sunset and sunrise times, we can load them into a dataframe:
We still have to write a UDF to determine if the incident took place during the night or the day. To do that we just compare our timestamp to the time of sunrise and sunset:
We join with the
sunsetDF dataframe in order to benefit from the sunrise and
You’ll notice that we don’t directly use the provided
Dates variable. This is
because sunrise/sunset times are given by the API in the UTC timezone. As a
result we converted
Dates to the UTC timezone (
TimestampUTC) and extracted
its date (
We could imagine that incidents for a few categories would occur mostly outdoor
VEHICLE THEFT for which the weather could have an influence as opposed to
other types of incidents which would occur indoor and for which the weather
wouldn’t have any impact.
To check this assumption, I assembled a dataset containing the most occurring weather condition and average temperature of every day in the dataset using wunderground.com. The script can be found here . It works similarly to the sunrise/sunset scraping we just saw except that wunderground gives us csv instead of json.
The resulting dataset can be found here.
As we’ve done before, we need to turn this dataset into a dataframe:
Next, we join our training and test dataframes with the new
As a result, we get two new features:
Finally, I wanted to see what I could do with the latitude and longitude variables. I came up with the idea of trying to find the neighborhood where the incident occurrend thinking that particular types of crimes are more inclined to happen in particular neighborhoods.
To find San Francisco neighborhoods as latitude/longitude polygons I used the Zillow API which fitted the bill perfectly providing neighborhoods for California as a shapefile. I used a bit of R to turn the shapefile into a json containing my polygons as WKT which can be found here.
Now that we have our SF neighborhoods as polygons we still have to determine in which one of these neighborhoods every incident occurred. For this task, I used the ESRI geometry api which lets you do all kinds of spatial data processing and, for me, check if a point (corresponding to an incident) is inside a polygon (corresponding to a neighborhood).
First up, we’ll need functions which turns WKTs into ESRI geometries
Next, we need a function which tells us if a geometry contains another:
As you may have noticed, the API is a bit cumbersome but it’s really well-documented.
Now that we have our neighborhoods and our utility functions, we can write a UDF which will tell us in which neighborhood an incident occurred:
We can now add those features to both our datasets:
Now that we have all our features, we can build our machine learning pipeline.
In order to be processed by our classifier, our categorical features need to
be indexed, this is done through
StringIndexer. However, we still have one
difficulty to sort through: unfortunately, some addresses are only present
in the test dataset. Hence, we’ll have to index our categorical variables with
all the data:
Since our label variable is also categorical, we’ll have to index it as well:
Then, we can assemble all our features into a single vector column as required by Spark’s ML algorithms:
Finally, we can define our classifier:
I chose a random forest classifier because it is one of the only multiclass
classifier available in Spark (
OneVsRest coupled with
wasn’t really an option on my computer).
A couple notes regarding the parameters:
Because we indexed our label variable
need to get back our original labels, this is done with
We can finally construct our pipeline:
We can now find the best model through cross validation. In this example, cross validation is a bit artificial because I was limited by my computer in terms of processing power.
To run cross validation, you’ll have to setup three different things:
We can finally train our model:
and make predictions on the test set:
A shortcoming we have to face when using cross validation is that to find the
best params found you have to look at the
INFO-level logs of your Spark
application which tend to be really noisy.
To circumvent this, you can disable
INFO level logs and retrieve them
Another nifty trick I learned is how to retrieve the importance of every feature in your model:
Last step, we have to format our results according to the Kaggle instructions
which are to have one column per
Category each filled with 0 except for
one filled with a 1 for the predicted
First, we need a list of all labels sorted alphabetically:
Next, we need a function which will turn our predicted
Category into a
sequence of zeros and a one at the right spot:
Then, we can create our result dataframe:
Basically, we create our wanted schema and recreate a dataframe from the
predictions which fits the schema.
Finally, we can write it to a file:
I wrote a little script which takes care of the file handling and submission to
Spark which you can find here. It just needs
spark-submit to be on your path.
I hope this could demonstrate the power of Apache Spark and in particular how easy it is to manipulate features and create new ones.
However, there are a few things which could have been done differently/better: