This post will detail how I built my entry to the Kaggle San Francisco crime classification competition using Apache Spark and the new ML library.
We’ll be exploring the San Francisco crime dataset which contains crimes which took place between 2003 and 2015 as detailed on the Kaggle competition page.
You can find the code for this post on Github.
The dataset given by Kaggle is split into two archives train.csv.zip
and
test.csv.zip
.
The training and test datasets contain a few features:
Dates
: timestamp of the crime incident in the PST timezoneDescript
: detailed description of the crime incident (unfortunately this is
only available in the training dataset so it’ll be pretty much useless)DayOfWeek
: the day of the week of the crime incidentPdDistrict
: name of the police department district which handled the crime
incidentResolution
: how the crime incident was resolved (it’s also only in the
training dataset)Address
: the approximate street address of the incidentX
: longitude of the incidentY
: latitude of the incidentThe goal of the competition is to predict the Category
variable which
corresponds to the type of offense committed, for example, here are the ten most
common offenses:
Category | count |
---|---|
LARCENY/THEFT | 174900 |
OTHER OFFENSES | 126182 |
NON-CRIMINAL | 92304 |
ASSAULT | 76876 |
DRUG/NARCOTIC | 53971 |
VEHICLE THEFT | 53781 |
VANDALISM | 44725 |
WARRANTS | 42214 |
BURGLARY | 36755 |
SUSPICIOUS OCC | 31414 |
Since the data is in csv format, we’ll use spark-csv which will parse our csv and give us back DataFrame
s:
val csvFormat = "com.databricks.spark.csv"
def loadData(
trainFile: String,
testFile: String,
sqlContext: SQLContext
): (DataFrame, DataFrame) = {
val schemaArray = Array(
StructField("Id", LongType),
StructField("Dates", TimestampType),
StructField("Category", StringType), // target variable
StructField("Descript", StringType),
StructField("DayOfWeek", StringType),
StructField("PdDistrict", StringType),
StructField("Resolution", StringType),
StructField("Address", StringType),
StructField("X", DoubleType),
StructField("Y", DoubleType)
)
val trainSchema = StructType(schemaArray.filterNot(_.name == "Id"))
val testSchema = StructType(schemaArray.filterNot { p =>
Seq("Category", "Descript", "Resolution") contains p.name
})
val trainDF = sqlContext.read
.format(csvFormat)
.option("header", "true")
.schema(trainSchema)
.load(trainFile)
val testDF = sqlContext.read
.format(csvFormat)
.option("header", "true")
.schema(testSchema)
.load(testFile)
(trainDF, testDF)
}
This function takes the paths to the uncompressed training and test files and
a sqlContext
which will have been initialized beforehand like so:
val sc = new SparkContext(new SparkConf().setAppName("SFCrime"))
val sqlContext = new SQLContext(sc)
I should point out that you can let Spark infer the schema. However, I tend to always specify the type of every field to make sure every value of every variable is of the expected type.
We can now load our data:
val (rawTrainDF, rawTestDF) = loadData(trainFile, testFile, sqlContext)
As a side note, parsing csv will be part of Apache Spark 2.0 so we won’t have
to rely on another jar anymore.
Basically, we’re left with six features:
Dates
DayOfWeek
Address
which is unusable as is (23 228 distinct addresses)PdDistrict
X
and Y
which is basically one single feature: the coordinates of the
incidentLet’s see if there are other features which could help our classification.
Along the lines of DayOfWeek
I wanted to see if other time-related features
had any impact on the type of offense being committed.
Intuitively, given the top 10 categories, we would think that some types of crime take place during night hours most of the time such as vehicle thefts or vandalism.
Another trend I wanted to investigate was whether there were more crimes being
committed during certain years (and months). For example, here is the
distribution of LARCENY/THEFT
crimes according to the year the incident
occurred:
Year | count |
---|---|
2014 | 18901 |
2013 | 18152 |
2012 | 15639 |
2006 | 13798 |
2011 | 13084 |
We clearly see an upward trend in the last few years.
This can be obtained with:
enrichedTrainDF
.filter(enrichedTrainDF("Category") === "LARCENY/THEFT")
.groupBy("Year")
.count()
.sort(desc("count"))
.show(5)
Conversely, the ASSAULT
s occurred the most during the first few years of the
time span covered by this dataset:
Year | count |
---|---|
2003 | 6555 |
2004 | 6467 |
2006 | 6364 |
2008 | 6327 |
2013 | 6280 |
To sum things up, we need three new features: the hour of day, the month and the year. Fortunately, Spark comes bundled with a lot of sql-like utility functions which were presented in a Databricks blogpost a few months back. This will greatly simplify our work:
df
.withColumn("HourOfDay", hour(col("Dates")))
.withColumn("Month", month(col("Dates")))
.withColumn("Year", year(col("Dates")))
Next up, I wrote a UDF to check if the incident took place during the weekend:
val enrichWeekend = (df: DataFrame) => {
def weekendUDF = udf { (dayOfWeek: String) =>
dayOfWeek match {
case _ @ ("Friday" | "Saturday" | "Sunday") => 1
case _ => 0
}
}
df.withColumn("Weekend", weekendUDF(col("DayOfWeek")))
}
Then, I wanted to make the Address
variable usable. If you have a look at a
few addresses in the dataset, you’ll notice that they come in two forms:
Consequently, I introduced two features from this column:
AddressType
which indicates whether the incident took place at an
intersection or on a particular streetStreet
where I attempted to parse the Address
variable to a single street
name, this reduced the cardinality of the original feature by 10xUnfortunately, the Street
variable will only contain the first address
(alphabetically) if Address
is an intersection. So, is is possible that two
addresses containing the same street represented by intersections won’t result
in the same street.
For example, given two Address
: A STREET / B STREET
and
B STREET / C STREET
the resulting Street
will be A STREET
and B STREET
.
val enrichAddress = (df: DataFrame) => {
def addressTypeUDF = udf { (address: String) =>
if (address contains "/") "Intersection"
else "Street"
}
val streetRegex = """\d{1,4} Block of (.+)""".r
val intersectionRegex = """(.+) / (.+)""".r
def addressUDF = udf { (address: String) =>
streetRegex findFirstIn address match {
case Some(streetRegex(s)) => s
case None => intersectionRegex findFirstIn address match {
case Some(intersectionRegex(s1, s2)) => if (s1 < s2) s1 else s2
case None => address
}
}
}
df
.withColumn("AddressType", addressTypeUDF(col("Address")))
.withColumn("Street", addressUDF(col("Address")))
}
Along the same lines of the HourOfDay
feature, I reasoned that it would be
interesting to see if an incident occurred during the day or the night.
To solve this issue, I used the sunrise-sunset.org API with this script. It basically makes a request to the API for each day present in the dataset to retrieve the time of sunrise and sunset, parses the json-formatted result thanks to circe (which is a great json library for Scala by the way) and writes it to a file. There is an important thing to note about this script: a request is made every five seconds in order not to overload the API.
The resulting dataset can be found in the repo.
Once we have all our sunset and sunrise times, we can load them into a dataframe:
val sunsetDF = {
val rdd = sc.wholeTextFiles(sunsetFile).map(_._2)
sqlContext.read.json(rdd)
}
We still have to write a UDF to determine if the incident took place during the night or the day. To do that we just compare our timestamp to the time of sunrise and sunset:
def enrichDayOrNight(sunsetDF: DataFrame)(df: DataFrame): DataFrame = {
def dayOrNigthUDF = udf { (timestampUTC: String, sunrise: String, sunset: String) =>
val timestampFormatter = DateTimeFormatter.ofPattern("YYYY-MM-dd HH:mm:ss")
val timeFormatter = DateTimeFormatter.ofPattern("h:mm:ss a")
val time = LocalTime.parse(timestampUTC, timestampFormatter)
val sunriseTime = LocalTime.parse(sunrise, timeFormatter)
val sunsetTime = LocalTime.parse(sunset, timeFormatter)
if (sunriseTime.compareTo(sunsetTime) > 0) {
if (time.compareTo(sunsetTime) > 0 && time.compareTo(sunriseTime) < 0) {
"Night"
} else {
"Day"
}
} else {
if (time.compareTo(sunriseTime) > 0 && time.compareTo(sunsetTime) < 0) {
"Day"
} else {
"Night"
}
}
}
df
.join(sunsetDF, df("Date") === sunsetDF("date"))
.withColumn("DayOrNight", dayOrNigthUDF(col("TimestampUTC"), col("sunrise"), col("sunset")))
}
We join with the sunsetDF
dataframe in order to benefit from the sunrise and
sunset columns.
You’ll notice that we don’t directly use the provided Dates
variable. This is
because sunrise/sunset times are given by the API in the UTC timezone. As a
result we converted Dates
to the UTC timezone (TimestampUTC
) and extracted
its date (Date
).
We could imagine that incidents for a few categories would occur mostly outdoor
like VEHICLE THEFT
for which the weather could have an influence as opposed to
other types of incidents which would occur indoor and for which the weather
wouldn’t have any impact.
To check this assumption, I assembled a dataset containing the most occurring weather condition and average temperature of every day in the dataset using wunderground.com. The script can be found here . It works similarly to the sunrise/sunset scraping we just saw except that wunderground gives us csv instead of json.
The resulting dataset can be found here.
As we’ve done before, we need to turn this dataset into a dataframe:
val weatherDF = {
val rdd = sc.wholeTextFiles(weatherFile).map(_._2)
sqlContext.read.json(rdd)
}
Next, we join our training and test dataframes with the new weatherDF
:
def enrichWeather(weatherDF: DataFrame)(df: DataFrame): DataFrame =
df.join(weatherDF, df("Date") === weatherDF("date"))
As a result, we get two new features: weather
and temperatureC
.
Finally, I wanted to see what I could do with the latitude and longitude variables. I came up with the idea of trying to find the neighborhood where the incident occurrend thinking that particular types of crimes are more inclined to happen in particular neighborhoods.
To find San Francisco neighborhoods as latitude/longitude polygons I used the Zillow API which fitted the bill perfectly providing neighborhoods for California as a shapefile. I used a bit of R to turn the shapefile into a json containing my polygons as WKT which can be found here.
Now that we have our SF neighborhoods as polygons we still have to determine in which one of these neighborhoods every incident occurred. For this task, I used the ESRI geometry api which lets you do all kinds of spatial data processing and, for me, check if a point (corresponding to an incident) is inside a polygon (corresponding to a neighborhood).
First up, we’ll need functions which turns WKTs into ESRI geometries
(i.e. Point
and Polygon
):
def createGeometryFromWKT[T <: Geometry](wkt: String): T = {
val wktImportFlags = WktImportFlags.wktImportDefaults
val geometryType = Geometry.Type.Unknown
val g = OperatorImportFromWkt.local().execute(wktImportFlags, geometryType, wkt, null)
g.asInstanceOf[T]
}
Next, we need a function which tells us if a geometry contains another:
def contains(container: Geometry, contained: Geometry): Boolean =
OperatorContains.local().execute(container, contained, SpatialReference.create(3426), null)
As you may have noticed, the API is a bit cumbersome but it’s really well-documented.
Now that we have our neighborhoods and our utility functions, we can write a UDF which will tell us in which neighborhood an incident occurred:
def enrichNeighborhoods(nbhds: Seq[Neighborhood])(df: DataFrame): DataFrame = {
def nbhdUDF = udf { (lat: Double, lng: Double) =>
val point = createGeometryFromWKT[Point](s"POINT($lat $lng)")
nbhds
.filter(nbhd => contains(nbhd.polygon, point))
.map(_.name)
.headOption match {
case Some(nbhd) => nbhd
case None => "SF"
}
}
df.withColumn("Neighborhood", nbhdUDF(col("X"), col("Y")))
}
We can now add those features to both our datasets:
val enrichFunctions = List(enrichTime, enrichWeekend, enrichAddress,
enrichDayOrNight(sunsetDF)(_), enrichWeather(weatherDF)(_), enrichNeighborhoods(nbhds)(_))
val Array(enrichedTrainDF, enrichedTestDF) =
Array(rawTrainDF, rawTestDF) map (enrichFunctions reduce (_ andThen _))
Now that we have all our features, we can build our machine learning pipeline.
In order to be processed by our classifier, our categorical features need to
be indexed, this is done through StringIndexer
. However, we still have one
difficulty to sort through: unfortunately, some addresses are only present
in the test dataset. Hence, we’ll have to index our categorical variables with
all the data:
val allData = enrichedTrainDF
.select((numericFeatColNames ++ categoricalFeatColNames).map(col): _*)
.unionAll(enrichedTestDF
.select((numericFeatColNames ++ categoricalFeatColNames).map(col): _*))
allData.cache()
val stringIndexers = categoricalFeatColNames.map { colName =>
new StringIndexer()
.setInputCol(colName)
.setOutputCol(colName + "Indexed")
.fit(allData)
}
Since our label variable is also categorical, we’ll have to index it as well:
val labelIndexer = new StringIndexer()
.setInputCol(labelColName)
.setOutputCol(labelColName + "Indexed")
.fit(enrichedTrainDF)
Then, we can assemble all our features into a single vector column as required by Spark’s ML algorithms:
val assembler = new VectorAssembler()
.setInputCols((categoricalFeatColNames.map(_ + "Indexed") ++ numericFeatColNames).toArray)
.setOutputCol(featuresColName)
Finally, we can define our classifier:
val randomForest = new RandomForestClassifier()
.setLabelCol(labelColName + "Indexed")
.setFeaturesCol(featuresColName)
.setMaxDepth(10)
.setMaxBins(2089)
I chose a random forest classifier because it is one of the only multiclass
classifier available in Spark (OneVsRest
coupled with LogisticRegression
wasn’t really an option on my computer).
A couple notes regarding the parameters:
Street
column).Because we indexed our label variable Category
to CategoryIndexed
, we’ll
need to get back our original labels, this is done with IndexToString
:
val indexToString = new IndexToString()
.setInputCol("prediction")
.setOutputCol(predictedLabelColName)
.setLabels(labelIndexer.labels)
We can finally construct our pipeline:
val pipeline = new Pipeline()
.setStages(Array.concat(
stringIndexers.toArray,
Array(labelIndexer, assembler, randomForest, indexToString)
))
We can now find the best model through cross validation. In this example, cross validation is a bit artificial because I was limited by my computer in terms of processing power.
To run cross validation, you’ll have to setup three different things:
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol(labelColName + "Indexed")
val paramGrid = new ParamGridBuilder()
.addGrid(randomForest.impurity, Array("entropy", "gini"))
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(3)
We can finally train our model:
val cvModel = cv.fit(enrichedTrainDF)
and make predictions on the test set:
val predictions = cvModel
.transform(enrichedTestDF)
.select("Id", predictedLabelColName)
A shortcoming we have to face when using cross validation is that to find the
best params found you have to look at the INFO
-level logs of your Spark
application which tend to be really noisy.
To circumvent this, you can disable INFO
level logs and retrieve them
directly:
val bestEstimatorParamMap = cvModel.getEstimatorParamMaps
.zip(cvModel.avgMetrics)
.maxBy(_._2)
._1
println(bestEstimatorParamMap)
Another nifty trick I learned is how to retrieve the importance of every feature in your model:
val featureImportances = cvModel
.bestModel.asInstanceOf[PipelineModel]
.stages(categoricalFeatColNames.size + 2)
.asInstanceOf[RandomForestClassificationModel].featureImportances
assembler.getInputCols
.zip(featureImportances.toArray)
.foreach { case (feat, imp) => println(s"feature: $feat, importance: $imp") }
Last step, we have to format our results according to the Kaggle instructions
which are to have one column per Category
each filled with 0 except for
one filled with a 1 for the predicted Category
.
First, we need a list of all labels sorted alphabetically:
val labels = enrichedTrainDF.select(labelColName).distinct().collect()
.map { case Row(label: String) => label }
.sorted
Next, we need a function which will turn our predicted Category
into a
sequence of zeros and a one at the right spot:
val labelToVec = (predictedLabel: String) => {
val array = new Array[Int](labels.length)
array(labels.indexOf(predictedLabel)) = 1
array.toSeq
}
Then, we can create our result dataframe:
val schema = StructType(predictions.schema.fields ++ labels.map(StructField(_, IntegerType)))
val resultDF = sqlContext.createDataFrame(
predictions.rdd.map { r => Row.fromSeq(
r.toSeq ++
labelToVec(r.getAs[String](predictedLabelColName))
)},
schema
)
Basically, we create our wanted schema and recreate a dataframe from the
existing predictions
which fits the schema.
Finally, we can write it to a file:
resultDF
.drop("predictedLabel")
.coalesce(1)
.write
.format(csvFormat)
.option("header", "true")
.save(outputFile)
I wrote a little script which takes care of the file handling and submission to
Spark which you can find here. It just needs spark-submit
to be on your path.
I hope this could demonstrate the power of Apache Spark and in particular how easy it is to manipulate features and create new ones.
However, there are a few things which could have been done differently/better: