Scala Future and the Elasticsearch Java API

Most examples of the Elasticsearch Java API you’ll have seen follow the prepare/execute/actionGet pattern (in Scala):

val response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elastic Search") .endObject() ) .execute() .actionGet()

This blocks in actionGet().

If you’re developing in a reactive environment (eg. Akka or Play), blocking is a no no. And even if you’re not, it’s a win to be able to avoid blocking so you can parallelize indexing to eek out the best performance.

One possibility would be to wrap the call in future { ... }, but given Elasticsearch is actually submitting this request to one of its own thread pools, there should be no need - this unnecessarily ties up another thread simply to block.

A neat trick is you can implement the ActionListener interface, and have the listener callback ‘push’ the result through a Promise:

import org.elasticsearch.action.{ActionListener, ActionRequestBuilder, ActionResponse} import scala.concurrent.Promise /** * Adapts an org.elasticsearch.action.ActionListener to a scala Future. * This avoids needing to block an extra thread on the elasticsearch Action. */ class ActionListenerAdapter[T <: ActionResponse] extends ActionListener[T] { val promise = Promise[T]() def onResponse(response: T) { promise.success(response) } def onFailure(e: Throwable) { promise.failure(e) } def executeFuture[RB <: ActionRequestBuilder[_,T,_]](request: RB): Future[T] = { request.execute(this) promise.future } }

This is templated on the response type, so it can be applied to any of the *Response api.

Then listener.executeFuture gets you a Future you can map, etc. as usual. So the above index call becomes:

val request = client.prepareIndex("twitter", "tweet", "1") .setSource("""{"text": "my document"}""") val listener = new ActionListenerAdapter[org.elasticsearch.action.index.IndexResponse] listener.executeFuture(request) map { result => // do something asynchronously with the response }

Non-blocking code. Bing!

Comments

blogroll

social