Finding related searches with Spark

Share on:

Unsupervised learning from user behaviour

When a user navigates a site they leave a valuable trail of information - what their first search was, what they followed this search with, and so on. Using this data we can learn related searches automatically by co-occurrence counting.

This post takes you through the steps to get from raw search logs to results using the Spark cluster computing framework.

Spark provides a natural processing language for flows of data, and can be scaled up to clusters when data growth dictates.

Input

The tab-separated logs were taken from a real estate website:

2012-12-31 06:02:59     10.1.2.45      /search?q=Colorado
2012-12-31 06:15:43     10.1.161.78    /search?q=Phoenix%2C%20AZ
...

The logs are basic - just: timestamp, IP address, URL-encoded search query. Ideally tracking would be by a cookie-based session ID, but in this case IP will do. There’s a few million rows of data stored on S3, which Spark will happily pull straight from.

Step by step

Download the joda time jars to help with the timestamp parsing and fire up the Spark shell:

$ cd spark-0.8.0-incubating-bin-cdh4
$ wget http://repo1.maven.org/maven2/joda-time/joda-time/2.3/joda-time-2.3.jar
$ wget http://repo1.maven.org/maven2/org/joda/joda-convert/1.5/joda-convert-1.5.jar
$ ADD_JARS=joda-time-2.3.jar,joda-convert-1.5.jar ./spark-shell

Read in the data, split it on tab:

val logs = sc.textFile("s3n://key:secret@mybucket/log*").map(_.split("\t"))

Parse the timestamp and query:

import org.joda.time.format.DateTimeFormat
import java.net.URLDecoder
import scala.util.{Try, Success}
val re_q = """q=([^&]+)""".r
def parseDT(s: String) = DateTimeFormat.forPattern("YYYY-MM-dd HH:mm:ss").parseDateTime(s)
val entries = logs.collect {
  // extract query term
  case Array(ts, ip, re_q(escaped)) =>
    // parse timestamp and url decode query term
    (Try(parseDT(ts)), ip, Try(URLDecoder.decode(escaped)))
}.collect {
  case(Success(ts), ip, Success(term)) => (ip, (ts, term))
}

This pattern of Try/collect/case works well for dealing with dodgy data. You’ll always get dodgy data - empty strings, bad encodings, etc. so it’s a good idiom to filter these out without involving lots of try/catch ugliness.

Next, group them by IP:

    val sessions = entries.groupByKey

Now the smart part - for each session sort the requests, take consecutive pairs (sliding(2)) of searches, and filter down to just pairs that were within 20 minutes of each other. We assume any searches over 20 minutes apart probably weren’t related.

The session IP is thrown away at this point too, and the results flatMaped together.

// define an Ordering for DateTime
implicit def dateTimeOrdering: Ordering[DateTime] = Ordering.fromLessThan(_ isBefore _)

// define a filter function
import org.joda.time.{DateTime, Minutes}
def nearFilter(t1: DateTime, t2: DateTime) = Minutes.minutesBetween(t1, t2).getMinutes < 20

val pairs = sessions.flatMap { case (_, requests) =>
  requests.sorted.sliding(2).collect {
    case Seq(a, b) if nearFilter(a._1, b._1) => (a._2, b._2)
  }
}

A quick check of the output:

pairs.take(10)
Array[(java.lang.String, java.lang.String)] = Array((Juno Beach, FL,Jupiter, FL), (San Francisco, CA,33469), (33408,33408), (33408,Juno Beach, FL), (Los Angeles, CA,Los Angeles, CA), (91107,91107), (California,California), (California,California), (California,California), (California,California))

Oops, forgot about repeat searches, let’s filter those out:

val upairs = pairs.filter { case (a, b) => a != b }

Finally count the results up (similar to word count), and print the top results. Here each pair is weighted equally (1), but there’s no reason you couldn’t bias weighting towards searches that happen soonest within each other, for example.

val counts = upairs.map((_, 1)).reduceByKey(_+_)
// print top 5 results by frequency
counts.top(5)(Ordering.by(_._2)).foreach(println)

The top results aren’t hugely interesting - the most popular related searches are big city -> big city:

((NY,San Francisco, CA),7210)
((Los Angeles, CA,NY),6206)
((San Francisco, CA,Los Angeles, CA),4190)
((San Francisco, CA,NY),4187)
((NY,Los Angeles, CA),3186)

More interesting results come from looking at specific boroughs, eg Brooklyn, NY:

counts.filter(_._1._1 == "Brooklyn, NY").top(10)(Ordering.by(_._2)).foreach(println)

Results:

((Brooklyn, NY,NY),325)
((Brooklyn, NY,Queens, NY),275)
((Brooklyn, NY,Manhattan, NY),193)
((Brooklyn, NY,Los Angeles, CA),183)
((Brooklyn, NY,Staten Island, NY),167)
((Brooklyn, NY,Bronx, NY),154)
((Brooklyn, NY,New York),137)
((Brooklyn, NY,Philadelphia, PA),123)
((Brooklyn, NY,Hoboken, NJ),102)
((Brooklyn, NY,Astoria, NY),92)

The numbers are a bit low because it was a small sample of the data I’m testing with, but even so they make a lot of sense. It highlights people tend to search borough to neighbouring borough.

Usages

There’s several possibilities for how the results could be used. They could be used for suggesting related searches to a user, with the aim of increasing site retention by offering easier cross navigation between likely searches. They could also provide alternatives on zero results pages.

You could even look at time-based trends in the data - as people move out of more expensive suburbs to up and coming cheaper areas, do the co-occurrence patterns change to reflect this?

Further reading

An interesting paper on this subject (and much more) is one on the LinkedIn system, Metaphor.