My entry to the Kaggle SF crime classification competition using Apache Spark

June 8, 2016 -

This post will detail how I built my entry to the Kaggle San Francisco crime classification competition using Apache Spark and the new ML library.

We’ll be exploring the San Francisco crime dataset which contains crimes which took place between 2003 and 2015 as detailed on the Kaggle competition page.

You can find the code for this post on Github.

The dataset

The dataset given by Kaggle is split into two archives train.csv.zip and test.csv.zip. The training and test datasets contain a few features:

The goal of the competition is to predict the Category variable which corresponds to the type of offense committed, for example, here are the ten most common offenses:

Category count
LARCENY/THEFT 174900
OTHER OFFENSES 126182
NON-CRIMINAL 92304
ASSAULT 76876
DRUG/NARCOTIC 53971
VEHICLE THEFT 53781
VANDALISM 44725
WARRANTS 42214
BURGLARY 36755
SUSPICIOUS OCC 31414


Loading the data

Since the data is in csv format, we’ll use spark-csv which will parse our csv and give us back DataFrames:

val csvFormat = "com.databricks.spark.csv"
def loadData(
    trainFile: String,
    testFile: String,
    sqlContext: SQLContext
): (DataFrame, DataFrame) = {
  val schemaArray = Array(
    StructField("Id", LongType),
    StructField("Dates", TimestampType),
    StructField("Category", StringType), // target variable
    StructField("Descript", StringType),
    StructField("DayOfWeek", StringType),
    StructField("PdDistrict", StringType),
    StructField("Resolution", StringType),
    StructField("Address", StringType),
    StructField("X", DoubleType),
    StructField("Y", DoubleType)
  )

  val trainSchema = StructType(schemaArray.filterNot(_.name == "Id"))
  val testSchema = StructType(schemaArray.filterNot { p =>
    Seq("Category", "Descript", "Resolution") contains p.name
  })

  val trainDF = sqlContext.read
    .format(csvFormat)
    .option("header", "true")
    .schema(trainSchema)
    .load(trainFile)

  val testDF = sqlContext.read
    .format(csvFormat)
    .option("header", "true")
    .schema(testSchema)
    .load(testFile)

  (trainDF, testDF)
}

This function takes the paths to the uncompressed training and test files and a sqlContext which will have been initialized beforehand like so:

val sc = new SparkContext(new SparkConf().setAppName("SFCrime"))
val sqlContext = new SQLContext(sc)

I should point out that you can let Spark infer the schema. However, I tend to always specify the type of every field to make sure every value of every variable is of the expected type.

We can now load our data:

val (rawTrainDF, rawTestDF) = loadData(trainFile, testFile, sqlContext)

As a side note, parsing csv will be part of Apache Spark 2.0 so we won’t have to rely on another jar anymore.

Feature engineering

Basically, we’re left with six features:

Let’s see if there are other features which could help our classification.

Along the lines of DayOfWeek I wanted to see if other time-related features had any impact on the type of offense being committed.

Intuitively, given the top 10 categories, we would think that some types of crime take place during night hours most of the time such as vehicle thefts or vandalism.

Another trend I wanted to investigate was whether there were more crimes being committed during certain years (and months). For example, here is the distribution of LARCENY/THEFT crimes according to the year the incident occurred:

Year count
2014 18901
2013 18152
2012 15639
2006 13798
2011 13084


We clearly see an upward trend in the last few years.

This can be obtained with:

enrichedTrainDF
  .filter(enrichedTrainDF("Category") === "LARCENY/THEFT")
  .groupBy("Year")
  .count()
  .sort(desc("count"))
  .show(5)

Conversely, the ASSAULTs occurred the most during the first few years of the time span covered by this dataset:

Year count
2003 6555
2004 6467
2006 6364
2008 6327
2013 6280


To sum things up, we need three new features: the hour of day, the month and the year. Fortunately, Spark comes bundled with a lot of sql-like utility functions which were presented in a Databricks blogpost a few months back. This will greatly simplify our work:

df
  .withColumn("HourOfDay", hour(col("Dates")))
  .withColumn("Month", month(col("Dates")))
  .withColumn("Year", year(col("Dates")))

Weekend feature

Next up, I wrote a UDF to check if the incident took place during the weekend:

val enrichWeekend = (df: DataFrame) => {
  def weekendUDF = udf { (dayOfWeek: String) =>
    dayOfWeek match {
      case _ @ ("Friday" | "Saturday" | "Sunday") => 1
      case _ => 0
    }
  }
  df.withColumn("Weekend", weekendUDF(col("DayOfWeek")))
}

Address features

Then, I wanted to make the Address variable usable. If you have a look at a few addresses in the dataset, you’ll notice that they come in two forms:

Consequently, I introduced two features from this column:

Unfortunately, the Street variable will only contain the first address (alphabetically) if Address is an intersection. So, is is possible that two addresses containing the same street represented by intersections won’t result in the same street.

For example, given two Address: A STREET / B STREET and B STREET / C STREET the resulting Street will be A STREET and B STREET.

val enrichAddress = (df: DataFrame) => {
  def addressTypeUDF = udf { (address: String) =>
    if (address contains "/") "Intersection"
    else "Street"
  }

  val streetRegex = """\d{1,4} Block of (.+)""".r
  val intersectionRegex = """(.+) / (.+)""".r
  def addressUDF = udf { (address: String) =>
    streetRegex findFirstIn address match {
      case Some(streetRegex(s)) => s
      case None => intersectionRegex findFirstIn address match {
        case Some(intersectionRegex(s1, s2)) => if (s1 < s2) s1 else s2
        case None => address
      }
    }
  }
  df
    .withColumn("AddressType", addressTypeUDF(col("Address")))
    .withColumn("Street", addressUDF(col("Address")))
}

Day or night feature

Along the same lines of the HourOfDay feature, I reasoned that it would be interesting to see if an incident occurred during the day or the night.

To solve this issue, I used the sunrise-sunset.org API with this script. It basically makes a request to the API for each day present in the dataset to retrieve the time of sunrise and sunset, parses the json-formatted result thanks to circe (which is a great json library for Scala by the way) and writes it to a file. There is an important thing to note about this script: a request is made every five seconds in order not to overload the API.

The resulting dataset can be found in the repo.

Once we have all our sunset and sunrise times, we can load them into a dataframe:

val sunsetDF = {
  val rdd = sc.wholeTextFiles(sunsetFile).map(_._2)
  sqlContext.read.json(rdd)
}

We still have to write a UDF to determine if the incident took place during the night or the day. To do that we just compare our timestamp to the time of sunrise and sunset:

def enrichDayOrNight(sunsetDF: DataFrame)(df: DataFrame): DataFrame = {
  def dayOrNigthUDF = udf { (timestampUTC: String, sunrise: String, sunset: String) =>
    val timestampFormatter = DateTimeFormatter.ofPattern("YYYY-MM-dd HH:mm:ss")
    val timeFormatter = DateTimeFormatter.ofPattern("h:mm:ss a")
    val time = LocalTime.parse(timestampUTC, timestampFormatter)
    val sunriseTime = LocalTime.parse(sunrise, timeFormatter)
    val sunsetTime = LocalTime.parse(sunset, timeFormatter)
    if (sunriseTime.compareTo(sunsetTime) > 0) {
      if (time.compareTo(sunsetTime) > 0 && time.compareTo(sunriseTime) < 0) {
        "Night"
      } else {
        "Day"
      }
    } else {
      if (time.compareTo(sunriseTime) > 0 && time.compareTo(sunsetTime) < 0) {
        "Day"
      } else {
        "Night"
      }
    }
  }

  df
    .join(sunsetDF, df("Date") === sunsetDF("date"))
    .withColumn("DayOrNight", dayOrNigthUDF(col("TimestampUTC"), col("sunrise"), col("sunset")))
}

We join with the sunsetDF dataframe in order to benefit from the sunrise and sunset columns. You’ll notice that we don’t directly use the provided Dates variable. This is because sunrise/sunset times are given by the API in the UTC timezone. As a result we converted Dates to the UTC timezone (TimestampUTC) and extracted its date (Date).

Weather features

We could imagine that incidents for a few categories would occur mostly outdoor like VEHICLE THEFT for which the weather could have an influence as opposed to other types of incidents which would occur indoor and for which the weather wouldn’t have any impact.

To check this assumption, I assembled a dataset containing the most occurring weather condition and average temperature of every day in the dataset using wunderground.com. The script can be found here . It works similarly to the sunrise/sunset scraping we just saw except that wunderground gives us csv instead of json.

The resulting dataset can be found here.

As we’ve done before, we need to turn this dataset into a dataframe:

val weatherDF = {
  val rdd = sc.wholeTextFiles(weatherFile).map(_._2)
  sqlContext.read.json(rdd)
}

Next, we join our training and test dataframes with the new weatherDF:

def enrichWeather(weatherDF: DataFrame)(df: DataFrame): DataFrame =
  df.join(weatherDF, df("Date") === weatherDF("date"))

As a result, we get two new features: weather and temperatureC.

Neighborhood feature

Finally, I wanted to see what I could do with the latitude and longitude variables. I came up with the idea of trying to find the neighborhood where the incident occurrend thinking that particular types of crimes are more inclined to happen in particular neighborhoods.

To find San Francisco neighborhoods as latitude/longitude polygons I used the Zillow API which fitted the bill perfectly providing neighborhoods for California as a shapefile. I used a bit of R to turn the shapefile into a json containing my polygons as WKT which can be found here.

Now that we have our SF neighborhoods as polygons we still have to determine in which one of these neighborhoods every incident occurred. For this task, I used the ESRI geometry api which lets you do all kinds of spatial data processing and, for me, check if a point (corresponding to an incident) is inside a polygon (corresponding to a neighborhood).

First up, we’ll need functions which turns WKTs into ESRI geometries (i.e. Point and Polygon):

def createGeometryFromWKT[T <: Geometry](wkt: String): T = {
  val wktImportFlags = WktImportFlags.wktImportDefaults
  val geometryType = Geometry.Type.Unknown
  val g = OperatorImportFromWkt.local().execute(wktImportFlags, geometryType, wkt, null)
  g.asInstanceOf[T]
}

Next, we need a function which tells us if a geometry contains another:

def contains(container: Geometry, contained: Geometry): Boolean =
  OperatorContains.local().execute(container, contained, SpatialReference.create(3426), null)

As you may have noticed, the API is a bit cumbersome but it’s really well-documented.

Now that we have our neighborhoods and our utility functions, we can write a UDF which will tell us in which neighborhood an incident occurred:

def enrichNeighborhoods(nbhds: Seq[Neighborhood])(df: DataFrame): DataFrame = {
  def nbhdUDF = udf { (lat: Double, lng: Double) =>
    val point = createGeometryFromWKT[Point](s"POINT($lat $lng)")
    nbhds
      .filter(nbhd => contains(nbhd.polygon, point))
      .map(_.name)
      .headOption match {
        case Some(nbhd) => nbhd
        case None => "SF"
      }
  }
  df.withColumn("Neighborhood", nbhdUDF(col("X"), col("Y")))
}

Putting it all together

We can now add those features to both our datasets:

val enrichFunctions = List(enrichTime, enrichWeekend, enrichAddress,
  enrichDayOrNight(sunsetDF)(_), enrichWeather(weatherDF)(_), enrichNeighborhoods(nbhds)(_))
val Array(enrichedTrainDF, enrichedTestDF) =
  Array(rawTrainDF, rawTestDF) map (enrichFunctions reduce (_ andThen _))


Machine learning pipeline

Now that we have all our features, we can build our machine learning pipeline.

Building the pipeline

In order to be processed by our classifier, our categorical features need to be indexed, this is done through StringIndexer. However, we still have one difficulty to sort through: unfortunately, some addresses are only present in the test dataset. Hence, we’ll have to index our categorical variables with all the data:

val allData = enrichedTrainDF
  .select((numericFeatColNames ++ categoricalFeatColNames).map(col): _*)
  .unionAll(enrichedTestDF
    .select((numericFeatColNames ++ categoricalFeatColNames).map(col): _*))
allData.cache()

val stringIndexers = categoricalFeatColNames.map { colName =>
  new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName + "Indexed")
    .fit(allData)
}

Since our label variable is also categorical, we’ll have to index it as well:

val labelIndexer = new StringIndexer()
  .setInputCol(labelColName)
  .setOutputCol(labelColName + "Indexed")
  .fit(enrichedTrainDF)

Then, we can assemble all our features into a single vector column as required by Spark’s ML algorithms:

val assembler = new VectorAssembler()
  .setInputCols((categoricalFeatColNames.map(_ + "Indexed") ++ numericFeatColNames).toArray)
  .setOutputCol(featuresColName)

Finally, we can define our classifier:

val randomForest = new RandomForestClassifier()
  .setLabelCol(labelColName + "Indexed")
  .setFeaturesCol(featuresColName)
  .setMaxDepth(10)
  .setMaxBins(2089)

I chose a random forest classifier because it is one of the only multiclass classifier available in Spark (OneVsRest coupled with LogisticRegression wasn’t really an option on my computer).

A couple notes regarding the parameters:

Because we indexed our label variable Category to CategoryIndexed, we’ll need to get back our original labels, this is done with IndexToString:

val indexToString = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol(predictedLabelColName)
  .setLabels(labelIndexer.labels)

We can finally construct our pipeline:

val pipeline = new Pipeline()
  .setStages(Array.concat(
    stringIndexers.toArray,
    Array(labelIndexer, assembler, randomForest, indexToString)
  ))

Cross validation

We can now find the best model through cross validation. In this example, cross validation is a bit artificial because I was limited by my computer in terms of processing power.

To run cross validation, you’ll have to setup three different things:

val evaluator = new MulticlassClassificationEvaluator()
  .setLabelCol(labelColName + "Indexed")
val paramGrid = new ParamGridBuilder()
  .addGrid(randomForest.impurity, Array("entropy", "gini"))
  .build()
val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(3)

We can finally train our model:

val cvModel = cv.fit(enrichedTrainDF)

and make predictions on the test set:

val predictions = cvModel
  .transform(enrichedTestDF)
  .select("Id", predictedLabelColName)

Best params

A shortcoming we have to face when using cross validation is that to find the best params found you have to look at the INFO-level logs of your Spark application which tend to be really noisy. To circumvent this, you can disable INFO level logs and retrieve them directly:

val bestEstimatorParamMap = cvModel.getEstimatorParamMaps
  .zip(cvModel.avgMetrics)
  .maxBy(_._2)
  ._1
println(bestEstimatorParamMap)

Feature importances

Another nifty trick I learned is how to retrieve the importance of every feature in your model:

val featureImportances = cvModel
  .bestModel.asInstanceOf[PipelineModel]
  .stages(categoricalFeatColNames.size + 2)
  .asInstanceOf[RandomForestClassificationModel].featureImportances
assembler.getInputCols
  .zip(featureImportances.toArray)
  .foreach { case (feat, imp) => println(s"feature: $feat, importance: $imp") }

Formatting the results

Last step, we have to format our results according to the Kaggle instructions which are to have one column per Category each filled with 0 except for one filled with a 1 for the predicted Category.

First, we need a list of all labels sorted alphabetically:

val labels = enrichedTrainDF.select(labelColName).distinct().collect()
  .map { case Row(label: String) => label }
  .sorted

Next, we need a function which will turn our predicted Category into a sequence of zeros and a one at the right spot:

val labelToVec = (predictedLabel: String) => {
  val array = new Array[Int](labels.length)
  array(labels.indexOf(predictedLabel)) = 1
  array.toSeq
}

Then, we can create our result dataframe:

val schema = StructType(predictions.schema.fields ++ labels.map(StructField(_, IntegerType)))
val resultDF = sqlContext.createDataFrame(
  predictions.rdd.map { r => Row.fromSeq(
    r.toSeq ++
      labelToVec(r.getAs[String](predictedLabelColName))
  )},
  schema
)

Basically, we create our wanted schema and recreate a dataframe from the existing predictions which fits the schema.

Finally, we can write it to a file:

resultDF
  .drop("predictedLabel")
  .coalesce(1)
  .write
  .format(csvFormat)
  .option("header", "true")
  .save(outputFile)


spark-submit script

I wrote a little script which takes care of the file handling and submission to Spark which you can find here. It just needs spark-submit to be on your path.

Conclusion

I hope this could demonstrate the power of Apache Spark and in particular how easy it is to manipulate features and create new ones.

However, there are a few things which could have been done differently/better: