Spark and Elasticsearch
Elastic Sparkle
If you work in the Hadoop world and have not yet heard of Spark, drop everything and go check it out. It’s a really powerful, intuitive and fast map/reduce system (and some).
Where it beats Hadoop/Pig/Hive hands down is it’s not a massive stack of quirky DSLs built on top of layers of clunky Java abstractions - it’s a simple, pure Scala functional DSL with all the flexibility and succinctness of Scala. And it’s fast, and properly interactive - query, bam response snappiness - not query, twiddle fingers, wait a bit.. response.
And if you’re into search, you’ll no doubt have heard of Elasticsearch - a distributed restful search engine built upon Lucene.
They’re perfect bedfellows - crunch your raw data and spit it out into a search index ready for serving to your frontend. At the company I work for we’ve built the google-analytics-esque part of our product around this combination.
It so fast, it flies - we can process raw event logs at 250,000 events/s without breaking a sweat on a meagre EC2 m1.large instance.
Anyway, enough babbling… to the demo…
If you just want to get running with a simple working project, checkout a complete project of this tutorial at github.com/barnybug/spark-elasticsearch-blogpost.
Download
They’re both dead easy to install.
Get Elasticsearch from here (0.90.6 was latest when this article was written):
$ wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-0.90.6.tar.gz
Get Spark from here:
$ wget http://spark-project.org/download/spark-0.8.0-incubating-bin-cdh4.tgz
Get my patched Elasticsearch hadoop plugin from github.com/barnybug/spark-elasticsearch-blogpost. (I haven’t Spark working yet with the latest release):
$ wget http://github.com/barnybug/spark-elasticsearch-blogpost/raw/master/lib/elasticsearch-hadoop-1.3.0.BUILD-SNAPSHOT.jar
Unpack everything
$ tar zxvf elasticsearch-0.90.6.tar.gz
$ tar zxvf spark-0.8.0-incubating-bin-cdh4.tgz
Run
$ cd elasticsearch-0.90.3
$ bin/elasticsearch
$ cd spark-0.8.0-incubating-bin-cdh4
$ ADD_JARS=../elasticsearch-hadoop-1.3.0.BUILD-SNAPSHOT.jar ./spark-shell
Index some sample data with Spark
In the spark shell, create a dataset with a familiar example tweet:
val tweet = Map("user" -> "kimchy", "post_date" -> "2009-11-15T14:12:12", "message" -> "trying out Elastic Search")
val tweets = sc.makeRDD(Seq(tweet))
Define a simple function toWritable
to help convert scala Maps to Hadoop Writables:
import org.apache.hadoop.io.{MapWritable, Text, NullWritable}
def toWritable(in: Map[String, String]) = {
val m = new MapWritable
for ((k, v) <- in)
m.put(new Text(k), new Text(v))
m
}
Map the collection to the expected types for the ESOutputFormat
(ugly Object
-> Object
). The key is throwaway, so we just
use a NullWritable:
val writables = tweets.map(toWritable)
val output = writables.map { v => (NullWritable.get : Object, v : Object) }
Finally, save the dataset as an ESOutputFormat
, which will index into the running elasticsearch (the output ‘file’ name isn’t used, so we just pass "-"
):
import org.elasticsearch.hadoop.mr.ESOutputFormat
sc.hadoopConfiguration.set("es.resource", "tweets/tweet")
output.saveAsHadoopFile[ESOutputFormat]("-")
Search for it
Hit localhost:9200/tweets/_search in your browser to see the indexed tweet.
Bigger data
Now to put spark to good use - indexing a bit more data. For want of a better sample, we’re going to index /var/log/syslog
.
Parse syslog messages:
val re = """(\w{3}\s+\d{2} \d{2}:\d{2}:\d{2}) (\w+) (\S+)\[(\d+)\]: (.+)""".r
val syslog = sc.textFile("/var/log/syslog")
val entries = syslog.collect { case re(timestamp, hostname, process, pid, message) =>
Map("timestamp" -> timestamp, "hostname" -> hostname,
"process" -> process, "pid" -> pid, "message" -> message)
}
The format of syslog varies system to system - adjust the regex to match yours.
Write it to elasticsearch:
val writables = entries.map(toWritable)
val output = writables.map { v => (NullWritable.get : Object, v : Object) }
sc.hadoopConfiguration.set("es.resource", "syslog/entry")
output.saveAsHadoopFile[ESOutputFormat]("-")
If all goes well you should see a bunch of messages from spark finishing with:
13/11/13 09:40:25 INFO spark.SparkContext: Job finished: saveAsHadoopFile at <console>:30, took 4.046905584 s
Now hit elasticsearch localhost:9200/syslog/_search, and voila your syslogs:
{
"took": 7,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1572,
"max_score": 1,
"hits": [
{
"_index": "syslog",
"_type": "entry",
"_id": "ThOMFd8TSViYMtEOeq6eBw",
"_score": 1,
"_source": {
"hostname": "oolong",
"process": "pulseaudio",
"timestamp": "Nov 12 14:34:15",
"message": "[alsa-sink] alsa-sink.c: We were woken up with POLLOUT set -- however a subsequent snd_pcm_avail() returned 0 or another value < min_avail.",
"pid": "2139"
}
},
You can download a complete project of this tutorial from github.com/barnybug/spark-elasticsearch-blogpost.