Scala Future and the Elasticsearch Java API
Most examples of the Elasticsearch Java
API you’ll have seen follow the
prepare/execute/actionGet
pattern (in Scala):
val response = client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elastic Search")
.endObject()
)
.execute()
.actionGet()
This blocks in actionGet()
.
If you’re developing in a reactive environment (eg. Akka or Play), blocking is a no no. And even if you’re not, it’s a win to be able to avoid blocking so you can parallelize indexing to eek out the best performance.
One possibility would be to wrap the call in future { ... }
, but given
Elasticsearch is actually submitting this request to one of its own thread
pools, there should be no need - this unnecessarily ties up another
thread simply to block.
A neat trick is you can implement the ActionListener
interface, and have the
listener callback ‘push’ the result through a Promise:
import org.elasticsearch.action.{ActionListener, ActionRequestBuilder, ActionResponse}
import scala.concurrent.Promise
/**
* Adapts an org.elasticsearch.action.ActionListener to a scala Future.
* This avoids needing to block an extra thread on the elasticsearch Action.
*/
class ActionListenerAdapter[T <: ActionResponse] extends ActionListener[T] {
val promise = Promise[T]()
def onResponse(response: T) {
promise.success(response)
}
def onFailure(e: Throwable) {
promise.failure(e)
}
def executeFuture[RB <: ActionRequestBuilder[_,T,_]](request: RB): Future[T] = {
request.execute(this)
promise.future
}
}
This is templated on the response type, so it can be applied to any of the *Response
api.
Then listener.executeFuture
gets you a Future you can map
, etc. as usual. So the above index call becomes:
val request = client.prepareIndex("twitter", "tweet", "1")
.setSource("""{"text": "my document"}""")
val listener = new ActionListenerAdapter[org.elasticsearch.action.index.IndexResponse]
listener.executeFuture(request) map { result =>
// do something asynchronously with the response
}
Non-blocking code. Bing!