This post will detail how I built my entry to the Kaggle San Francisco crime classification competition using Apache Spark and the new ML library.
We’ll be exploring the San Francisco crime dataset which contains crimes which took place between 2003 and 2015 as detailed on the Kaggle competition page.
You can find the code for this post on Github.
The dataset given by Kaggle is split into two archives train.csv.zip
and
test.csv.zip
.
The training and test datasets contain a few features:
Dates
: timestamp of the crime incident in the PST timezoneDescript
: detailed description of the crime incident (unfortunately this is
only available in the training dataset so it’ll be pretty much useless)DayOfWeek
: the day of the week of the crime incidentPdDistrict
: name of the police department district which handled the crime
incidentResolution
: how the crime incident was resolved (it’s also only in the
training dataset)Address
: the approximate street address of the incidentX
: longitude of the incidentY
: latitude of the incidentThe goal of the competition is to predict the Category
variable which
corresponds to the type of offense committed, for example, here are the ten most
common offenses:
Category | count |
---|---|
LARCENY/THEFT | 174900 |
OTHER OFFENSES | 126182 |
NON-CRIMINAL | 92304 |
ASSAULT | 76876 |
DRUG/NARCOTIC | 53971 |
VEHICLE THEFT | 53781 |
VANDALISM | 44725 |
WARRANTS | 42214 |
BURGLARY | 36755 |
SUSPICIOUS OCC | 31414 |
Since the data is in csv format, we’ll use spark-csv which will parse our csv and give us back DataFrame
s:
This function takes the paths to the uncompressed training and test files and
a sqlContext
which will have been initialized beforehand like so:
I should point out that you can let Spark infer the schema. However, I tend to always specify the type of every field to make sure every value of every variable is of the expected type.
We can now load our data:
As a side note, parsing csv will be part of Apache Spark 2.0 so we won’t have
to rely on another jar anymore.
Basically, we’re left with six features:
Dates
DayOfWeek
Address
which is unusable as is (23 228 distinct addresses)PdDistrict
X
and Y
which is basically one single feature: the coordinates of the
incidentLet’s see if there are other features which could help our classification.
Along the lines of DayOfWeek
I wanted to see if other time-related features
had any impact on the type of offense being committed.
Intuitively, given the top 10 categories, we would think that some types of crime take place during night hours most of the time such as vehicle thefts or vandalism.
Another trend I wanted to investigate was whether there were more crimes being
committed during certain years (and months). For example, here is the
distribution of LARCENY/THEFT
crimes according to the year the incident
occurred:
Year | count |
---|---|
2014 | 18901 |
2013 | 18152 |
2012 | 15639 |
2006 | 13798 |
2011 | 13084 |
We clearly see an upward trend in the last few years.
This can be obtained with:
Conversely, the ASSAULT
s occurred the most during the first few years of the
time span covered by this dataset:
Year | count |
---|---|
2003 | 6555 |
2004 | 6467 |
2006 | 6364 |
2008 | 6327 |
2013 | 6280 |
To sum things up, we need three new features: the hour of day, the month and the year. Fortunately, Spark comes bundled with a lot of sql-like utility functions which were presented in a Databricks blogpost a few months back. This will greatly simplify our work:
Next up, I wrote a UDF to check if the incident took place during the weekend:
Then, I wanted to make the Address
variable usable. If you have a look at a
few addresses in the dataset, you’ll notice that they come in two forms:
Consequently, I introduced two features from this column:
AddressType
which indicates whether the incident took place at an
intersection or on a particular streetStreet
where I attempted to parse the Address
variable to a single street
name, this reduced the cardinality of the original feature by 10xUnfortunately, the Street
variable will only contain the first address
(alphabetically) if Address
is an intersection. So, is is possible that two
addresses containing the same street represented by intersections won’t result
in the same street.
For example, given two Address
: A STREET / B STREET
and
B STREET / C STREET
the resulting Street
will be A STREET
and B STREET
.
Along the same lines of the HourOfDay
feature, I reasoned that it would be
interesting to see if an incident occurred during the day or the night.
To solve this issue, I used the sunrise-sunset.org API with this script. It basically makes a request to the API for each day present in the dataset to retrieve the time of sunrise and sunset, parses the json-formatted result thanks to circe (which is a great json library for Scala by the way) and writes it to a file. There is an important thing to note about this script: a request is made every five seconds in order not to overload the API.
The resulting dataset can be found in the repo.
Once we have all our sunset and sunrise times, we can load them into a dataframe:
We still have to write a UDF to determine if the incident took place during the night or the day. To do that we just compare our timestamp to the time of sunrise and sunset:
We join with the sunsetDF
dataframe in order to benefit from the sunrise and
sunset columns.
You’ll notice that we don’t directly use the provided Dates
variable. This is
because sunrise/sunset times are given by the API in the UTC timezone. As a
result we converted Dates
to the UTC timezone (TimestampUTC
) and extracted
its date (Date
).
We could imagine that incidents for a few categories would occur mostly outdoor
like VEHICLE THEFT
for which the weather could have an influence as opposed to
other types of incidents which would occur indoor and for which the weather
wouldn’t have any impact.
To check this assumption, I assembled a dataset containing the most occurring weather condition and average temperature of every day in the dataset using wunderground.com. The script can be found here . It works similarly to the sunrise/sunset scraping we just saw except that wunderground gives us csv instead of json.
The resulting dataset can be found here.
As we’ve done before, we need to turn this dataset into a dataframe:
Next, we join our training and test dataframes with the new weatherDF
:
As a result, we get two new features: weather
and temperatureC
.
Finally, I wanted to see what I could do with the latitude and longitude variables. I came up with the idea of trying to find the neighborhood where the incident occurrend thinking that particular types of crimes are more inclined to happen in particular neighborhoods.
To find San Francisco neighborhoods as latitude/longitude polygons I used the Zillow API which fitted the bill perfectly providing neighborhoods for California as a shapefile. I used a bit of R to turn the shapefile into a json containing my polygons as WKT which can be found here.
Now that we have our SF neighborhoods as polygons we still have to determine in which one of these neighborhoods every incident occurred. For this task, I used the ESRI geometry api which lets you do all kinds of spatial data processing and, for me, check if a point (corresponding to an incident) is inside a polygon (corresponding to a neighborhood).
First up, we’ll need functions which turns WKTs into ESRI geometries
(i.e. Point
and Polygon
):
Next, we need a function which tells us if a geometry contains another:
As you may have noticed, the API is a bit cumbersome but it’s really well-documented.
Now that we have our neighborhoods and our utility functions, we can write a UDF which will tell us in which neighborhood an incident occurred:
We can now add those features to both our datasets:
Now that we have all our features, we can build our machine learning pipeline.
In order to be processed by our classifier, our categorical features need to
be indexed, this is done through StringIndexer
. However, we still have one
difficulty to sort through: unfortunately, some addresses are only present
in the test dataset. Hence, we’ll have to index our categorical variables with
all the data:
Since our label variable is also categorical, we’ll have to index it as well:
Then, we can assemble all our features into a single vector column as required by Spark’s ML algorithms:
Finally, we can define our classifier:
I chose a random forest classifier because it is one of the only multiclass
classifier available in Spark (OneVsRest
coupled with LogisticRegression
wasn’t really an option on my computer).
A couple notes regarding the parameters:
Street
column).Because we indexed our label variable Category
to CategoryIndexed
, we’ll
need to get back our original labels, this is done with IndexToString
:
We can finally construct our pipeline:
We can now find the best model through cross validation. In this example, cross validation is a bit artificial because I was limited by my computer in terms of processing power.
To run cross validation, you’ll have to setup three different things:
We can finally train our model:
and make predictions on the test set:
A shortcoming we have to face when using cross validation is that to find the
best params found you have to look at the INFO
-level logs of your Spark
application which tend to be really noisy.
To circumvent this, you can disable INFO
level logs and retrieve them
directly:
Another nifty trick I learned is how to retrieve the importance of every feature in your model:
Last step, we have to format our results according to the Kaggle instructions
which are to have one column per Category
each filled with 0 except for
one filled with a 1 for the predicted Category
.
First, we need a list of all labels sorted alphabetically:
Next, we need a function which will turn our predicted Category
into a
sequence of zeros and a one at the right spot:
Then, we can create our result dataframe:
Basically, we create our wanted schema and recreate a dataframe from the
existing predictions
which fits the schema.
Finally, we can write it to a file:
I wrote a little script which takes care of the file handling and submission to
Spark which you can find here. It just needs spark-submit
to be on your path.
I hope this could demonstrate the power of Apache Spark and in particular how easy it is to manipulate features and create new ones.
However, there are a few things which could have been done differently/better: