Spark and Elasticsearch

Share on:

Elastic Sparkle

If you work in the Hadoop world and have not yet heard of Spark, drop everything and go check it out. It’s a really powerful, intuitive and fast map/reduce system (and some).

Where it beats Hadoop/Pig/Hive hands down is it’s not a massive stack of quirky DSLs built on top of layers of clunky Java abstractions - it’s a simple, pure Scala functional DSL with all the flexibility and succinctness of Scala. And it’s fast, and properly interactive - query, bam response snappiness - not query, twiddle fingers, wait a bit.. response.

And if you’re into search, you’ll no doubt have heard of Elasticsearch - a distributed restful search engine built upon Lucene.

They’re perfect bedfellows - crunch your raw data and spit it out into a search index ready for serving to your frontend. At the company I work for we’ve built the google-analytics-esque part of our product around this combination.

It so fast, it flies - we can process raw event logs at 250,000 events/s without breaking a sweat on a meagre EC2 m1.large instance.

Anyway, enough babbling… to the demo…

If you just want to get running with a simple working project, checkout a complete project of this tutorial at github.com/barnybug/spark-elasticsearch-blogpost.

Download

They’re both dead easy to install.

Get Elasticsearch from here (0.90.6 was latest when this article was written):

$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.6.tar.gz

Get Spark from here:

$ wget http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz

Get my patched Elasticsearch hadoop plugin from github.com/barnybug/spark-elasticsearch-blogpost. (I haven’t Spark working yet with the latest release):

$ wget http://github.com/barnybug/spark-elasticsearch-blogpost/raw/master/lib/elasticsearch-hadoop-1.3.0.BUILD-SNAPSHOT.jar

Unpack everything

$ tar zxvf elasticsearch-0.90.6.tar.gz 
$ tar zxvf spark-0.8.0-incubating-bin-cdh4.tgz

Run

$ cd elasticsearch-0.90.3
$ bin/elasticsearch

$ cd spark-0.8.0-incubating-bin-cdh4
$ ADD_JARS=../elasticsearch-hadoop-1.3.0.BUILD-SNAPSHOT.jar ./spark-shell

Index some sample data with Spark

In the spark shell, create a dataset with a familiar example tweet:

val tweet = Map("user" -> "kimchy", "post_date" -> "2009-11-15T14:12:12", "message" -> "trying out Elastic Search")
val tweets = sc.makeRDD(Seq(tweet))

Define a simple function toWritable to help convert scala Maps to Hadoop Writables:

import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
def toWritable(in: Map[String, String]) = {
    val m = new MapWritable
    for ((k, v) <- in)
        m.put(new Text(k), new Text(v))
    m
}

Map the collection to the expected types for the ESOutputFormat (ugly Object -> Object). The key is throwaway, so we just use a NullWritable:

val writables = tweets.map(toWritable)
val output = writables.map { v => (NullWritable.get : Object, v : Object) }

Finally, save the dataset as an ESOutputFormat, which will index into the running elasticsearch (the output ‘file’ name isn’t used, so we just pass "-"):

import org.elasticsearch.hadoop.mr.ESOutputFormat
sc.hadoopConfiguration.set("es.resource", "tweets/tweet")
output.saveAsHadoopFile[ESOutputFormat]("-")

Search for it

Hit localhost:9200/tweets/_search in your browser to see the indexed tweet.

Bigger data

Now to put spark to good use - indexing a bit more data. For want of a better sample, we’re going to index /var/log/syslog.

Parse syslog messages:

val re = """(\w{3}\s+\d{2} \d{2}:\d{2}:\d{2}) (\w+) (\S+)\[(\d+)\]: (.+)""".r
val syslog = sc.textFile("/var/log/syslog")
val entries = syslog.collect { case re(timestamp, hostname, process, pid, message) =>
    Map("timestamp" -> timestamp, "hostname" -> hostname,
      "process" -> process, "pid" -> pid, "message" -> message)
}

The format of syslog varies system to system - adjust the regex to match yours.

Write it to elasticsearch:

val writables = entries.map(toWritable)
val output = writables.map { v => (NullWritable.get : Object, v : Object) }
sc.hadoopConfiguration.set("es.resource", "syslog/entry")
output.saveAsHadoopFile[ESOutputFormat]("-")

If all goes well you should see a bunch of messages from spark finishing with:

13/11/13 09:40:25 INFO spark.SparkContext: Job finished: saveAsHadoopFile at <console>:30, took 4.046905584 s

Now hit elasticsearch localhost:9200/syslog/_search, and voila your syslogs:

{
   "took": 7,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 1572,
      "max_score": 1,
      "hits": [
         {
            "_index": "syslog",
            "_type": "entry",
            "_id": "ThOMFd8TSViYMtEOeq6eBw",
            "_score": 1,
            "_source": {
               "hostname": "oolong",
               "process": "pulseaudio",
               "timestamp": "Nov 12 14:34:15",
               "message": "[alsa-sink] alsa-sink.c: We were woken up with POLLOUT set -- however a subsequent snd_pcm_avail() returned 0 or another value < min_avail.",
               "pid": "2139"
            }
         },

You can download a complete project of this tutorial from github.com/barnybug/spark-elasticsearch-blogpost.