Scala Future and the Elasticsearch Java API

Share on:

Most examples of the Elasticsearch Java API you’ll have seen follow the prepare/execute/actionGet pattern (in Scala):

val response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elastic Search")
                    .endObject()
                  )
        .execute()
        .actionGet()

This blocks in actionGet().

If you’re developing in a reactive environment (eg. Akka or Play), blocking is a no no. And even if you’re not, it’s a win to be able to avoid blocking so you can parallelize indexing to eek out the best performance.

One possibility would be to wrap the call in future { ... }, but given Elasticsearch is actually submitting this request to one of its own thread pools, there should be no need - this unnecessarily ties up another thread simply to block.

A neat trick is you can implement the ActionListener interface, and have the listener callback ‘push’ the result through a Promise:

import org.elasticsearch.action.{ActionListener, ActionRequestBuilder, ActionResponse}
import scala.concurrent.Promise

/**
 * Adapts an org.elasticsearch.action.ActionListener to a scala Future.
 * This avoids needing to block an extra thread on the elasticsearch Action.
 */
class ActionListenerAdapter[T <: ActionResponse] extends ActionListener[T] {
  val promise = Promise[T]()

  def onResponse(response: T) {
    promise.success(response)
  }

  def onFailure(e: Throwable) {
    promise.failure(e)
  }

  def executeFuture[RB <: ActionRequestBuilder[_,T,_]](request: RB): Future[T] = {
    request.execute(this)
    promise.future
  }
}

This is templated on the response type, so it can be applied to any of the *Response api.

Then listener.executeFuture gets you a Future you can map, etc. as usual. So the above index call becomes:

val request = client.prepareIndex("twitter", "tweet", "1")
        .setSource("""{"text": "my document"}""")
val listener = new ActionListenerAdapter[org.elasticsearch.action.index.IndexResponse]
listener.executeFuture(request) map { result =>
    // do something asynchronously with the response
}

Non-blocking code. Bing!