Spark and Elasticsearch

Elastic Sparkle

If you work in the Hadoop world and have not yet heard of Spark, drop everything and go check it out. It’s a really powerful, intuitive and fast map/reduce system (and some).

Where it beats Hadoop/Pig/Hive hands down is it’s not a massive stack of quirky DSLs built on top of layers of clunky Java abstractions - it’s a simple, pure Scala functional DSL with all the flexibility and succinctness of Scala. And it’s fast, and properly interactive - query, bam response snappiness - not query, twiddle fingers, wait a bit.. response.

And if you’re into search, you’ll no doubt have heard of Elasticsearch - a distributed restful search engine built upon Lucene.

They’re perfect bedfellows - crunch your raw data and spit it out into a search index ready for serving to your frontend. At the company I work for we’ve built the google-analytics-esque part of our product around this combination.

It so fast, it flies - we can process raw event logs at 250,000 events/s without breaking a sweat on a meagre EC2 m1.large instance.

Anyway, enough babbling… to the demo…

If you just want to get running with a simple working project, checkout a complete project of this tutorial at


They’re both dead easy to install.

Get Elasticsearch from here (0.90.6 was latest when this article was written):

$ wget

Get Spark from here:

$ wget

Get my patched Elasticsearch hadoop plugin from (I haven’t Spark working yet with the latest release):

$ wget

Unpack everything

$ tar zxvf elasticsearch-0.90.6.tar.gz 
$ tar zxvf spark-0.8.0-incubating-bin-cdh4.tgz


$ cd elasticsearch-0.90.3
$ bin/elasticsearch

$ cd spark-0.8.0-incubating-bin-cdh4
$ ADD_JARS=../elasticsearch-hadoop-1.3.0.BUILD-SNAPSHOT.jar ./spark-shell

Index some sample data with Spark

In the spark shell, create a dataset with a familiar example tweet:

val tweet = Map("user" -> "kimchy", "post_date" -> "2009-11-15T14:12:12", "message" -> "trying out Elastic Search") val tweets = sc.makeRDD(Seq(tweet))

Define a simple function toWritable to help convert scala Maps to Hadoop Writables:

import{MapWritable, Text, NullWritable} def toWritable(in: Map[String, String]) = { val m = new MapWritable for ((k, v) <- in) m.put(new Text(k), new Text(v)) m }

Map the collection to the expected types for the ESOutputFormat (ugly Object -> Object). The key is throwaway, so we just use a NullWritable:

val writables = val output = { v => (NullWritable.get : Object, v : Object) }

Finally, save the dataset as an ESOutputFormat, which will index into the running elasticsearch (the output ‘file’ name isn’t used, so we just pass "-"):

import sc.hadoopConfiguration.set("es.resource", "tweets/tweet") output.saveAsHadoopFile[ESOutputFormat]("-")

Search for it

Hit localhost:9200/tweets/_search in your browser to see the indexed tweet.

Bigger data

Now to put spark to good use - indexing a bit more data. For want of a better sample, we’re going to index /var/log/syslog.

Parse syslog messages:

val re = """(\w{3}\s+\d{2} \d{2}:\d{2}:\d{2}) (\w+) (\S+)\[(\d+)\]: (.+)""".r val syslog = sc.textFile("/var/log/syslog") val entries = syslog.collect { case re(timestamp, hostname, process, pid, message) => Map("timestamp" -> timestamp, "hostname" -> hostname, "process" -> process, "pid" -> pid, "message" -> message) }

The format of syslog varies system to system - adjust the regex to match yours.

Write it to elasticsearch:

val writables = val output = { v => (NullWritable.get : Object, v : Object) } sc.hadoopConfiguration.set("es.resource", "syslog/entry") output.saveAsHadoopFile[ESOutputFormat]("-")

If all goes well you should see a bunch of messages from spark finishing with:

13/11/13 09:40:25 INFO spark.SparkContext: Job finished: saveAsHadoopFile at <console>:30, took 4.046905584 s

Now hit elasticsearch localhost:9200/syslog/_search, and voila your syslogs:

{ "took": 7, "timed_out": false, "_shards": { "total": 5, "successful": 5, "failed": 0 }, "hits": { "total": 1572, "max_score": 1, "hits": [ { "_index": "syslog", "_type": "entry", "_id": "ThOMFd8TSViYMtEOeq6eBw", "_score": 1, "_source": { "hostname": "oolong", "process": "pulseaudio", "timestamp": "Nov 12 14:34:15", "message": "[alsa-sink] alsa-sink.c: We were woken up with POLLOUT set -- however a subsequent snd_pcm_avail() returned 0 or another value < min_avail.", "pid": "2139" } },

You can download a complete project of this tutorial from