Alternate Futures and Promises for Clojure
At the moment I am experimenting with various techniques to speed up some update queries from our client. One important element of this efforts is making things parallel. E.g. to compute a list update of deltas for the client we need to compare the version of the user on the client and in the database and we need the user data at some other places on the delta computation. So it would be a good idea to initiate the loading of the user data asynchronous right at the beginning of the request processing. While the user data is loaded we can load/compute some other stuff (e.g. we need some data about the competition and the matches). When all the required information is present continue the rest of the the job (e.g. generate some XML or JSON).
Due to the fact that our backend is written in Clojure I took a closer look at the Clojure Future implementation
clojure.core/future. The Clojure implementation is pretty basic and very similar the the underlying Java implementation. This isn’t bad, but rather unsatisfying for a language emphasizing on concurrency like Clojure.The Clojure Future API consists of these functions in the
- Creates a future instance and executes the provided code.
- Creates a future instance and executes a provided function. This function is used by the
- Cancels the further, if possible.
trueif future f is cancelled
- Returns a promise object that can be read with
@, and set, once only.
- Delivers the supplied value to the promise, releasing any pending derefs. A subsequent call to deliver on a promise will throw an exception.
This is a small but sufficient API. The
deliver functions are relatively new and having a “Alpha – subject to change” note.
So how does this work:
(let [f (future (do (java.lang.Thread/sleep 42))] (println "value:" @f) )
You simply wrap the code you want the be execute in parallel in the
(future …) macro and you get a future datatype which reifies
java.util.concurrent.Future. This has some pretty nice effects. The future can be read with
deref and you may use it like any other Java future.
Not so nice is that you have the
(future-cancel …) function or the
cancel() method in the
java.util.concurrent.Future interface. So what is so bad about this? First: This is a mutating function on the reference, so if you pass your future running a important job to someone else, he may simply cancel this job and you can’t prevent this. Second: What happens when you cancel the job? When your long running job has a database update at the end, will the update be executed? Will truncations be rolled back? Connections closed?
That you don’t get noticed when the future processing is finished is somewhat sad too. Due to the fact the functions are first class citizens and closures are all around, it would be great if you could provide e.g. a function as callback when the future was successfully finished. The Scala scala.concurrent.Future implementation offers method like
onFailure(…) where you can supply these kind of handlers. What is also great on Scala Futures is that they offer methods like
flatMap, etc. which are used for the Scala
for comprehension. This enables you to do stuff like this:
for (a <- someExpensiveComputation() b <- hugeLoadFromDatabase()) yield merge (a, b)
This would execute in parallel the
hugeLoadFromDatabase and both results are available in the list comprehension.
The last flaw I see is, that you have no control about the execution. Clojure (1.3) uses the same
Executor used for
(send-off …) to execute the future but you cant’t influence this and you have no control on the executor. In Java there is lot’s of boilerplate, because you’ll have to create an
Executor and submit a
Runnable manually, therefore you have the full control. Scala had also this flaw hiding the execution of a Future but with the rework of Futures and Promises in Scala (SIP-14 an
ExecutionContext was introduced, which solves this issue.
So let’s explore how we can build a Future / Promise library in Clojure where that does not have these issues.
My goal would be to write something like this
(future (load-data-from-somewhere ctx) (on-success [v] (initiate-process v)) (on-failure [cause] (warn cause (format "Unable to load data, retrying in %s seconds" retry-threshold))))
it would be also nice to do something like
(let [f (for [order-data (future (fetch-order oid)) customer-data (future (fetch-customer cid))] (merge-data order-data customer-data))] (on-success f [data] (send-email data)))
This would return a future that aggregates the result of both fetch requests. You would be able to register for the
:on-success event and perform an operation if wanted.
Lets see how far we get …
Definition of Terms
- A Future is a read handle to a value that may be come available some times later. A Future may be read many times.
- A Promise is a write handle for a value that will be available in the later process. You may pass the promise out to someone else to provice a reference to a value that you don’t have yet. A promise may be written only one.
A Future and a Promise – at least in this example – will have this three states:
- The Future/Promise has not yet received a value.
- The Promise/Future has received the a value and the previous computation has to considered successful.
- The Promise/Future has received the a value and the previous computation failed e.g. threw an Exception.
Promise – the basics
First I want to define some artifacts needed for a Promise.
(defn promise  …) (defn status [promise] …)
I guess these are the first very functions needed for a promise. The first one
(promise …) is a basic factory function for the handle and the
(status promise) returns the status. The promise will be a
ref data type, so the value may be obtained by using
deref.To set the value we will define functions like:
(defn success [promise value] …) (defn failure [promise cause] …)
These two function will replace the
(declare …) function of the Clojure original. The
(success …) function will set the Promise value to the passed value and the state of the promise will change to
(failure …) function will indicate that the computation that gave out the Promise failed, the value of the Promise is reflects the cause for the failure and the state of the promise will change to
:failed. Both functions will throw an exception if someone tries to write to the Promise a second time.
How do we implement the handle. I defined a Clojure protocol to have an interface:
(defprotocol Promise (future* [promise]) (try-success* [promise x]) (try-failure* [promise x]) (status* [promise]) )
Pretty simple I guess so here is how it works:
- This function will create a future for this promise. So someone create a promise and pass it to an asynchronous process while passing out a future to another process that will receive the value when the value was written to the promise. In fact in my variant it simply returns itself.
(try-success* [promise x])
- This function try to write a value to the promise and switch to the
(try-failure* [promise x])
- This function try to write a value to the promise and switch to the
- This function returns the current state of the promise.
* at the end of the function name. I use this convention to indicate some private/delegate functions. E.g. the
(success …) function will simply delegate to the
(try-success* [promise x] …). The main difference is that both
(try-…) method returns a boolean value indicating a successful operation while
(success …) returns the supply value and will throw an exception if an error occurs.
The next step is to create an instance:
(defn- notify [x & listeners] (doseq [listener listeners :when listener] (listener x))) (defn- try-complete-promise [^java.util.concurrent.CountDownLatch d v s x complete variable] (if (pos? (.getCount d)) (if (compare-and-set! v nil [s x]) (do (.countDown d) (apply notify x (concat @complete @variable)) true) false))) (defn promise  (let [d (java.util.concurrent.CountDownLatch. 1) v (atom nil) on-complete (create-list on-complete) on-success (create-list on-success) on-failure (create-list on-failure)] (reify Promise (future* [this] this) (try-success* [_ x] (try-complete-promise d v :success x on-complete on-success)) (try-failure* [_ x] (try-complete-promise d v :failure x on-complete on-failure)) (status* [this] (if (.isRealized this) (first @v) :pending )) clojure.lang.IDeref (deref [_] (.await d) (second @v)) clojure.lang.IBlockingDeref (deref [_ timeout-ms timeout-val] (if (.await d timeout-ms java.util.concurrent.TimeUnit/MILLISECONDS) (second @v) timeout-val)) clojure.lang.IPending (isRealized [this] (zero? (.getCount d))) )))
The previously announced factory function simply uses
reify to create a new instance. The state is held as closure by the defined
let bindings. Primarily important are
d – a
CountDownLatch to prevent multiple writes and handle synchronization – and
v an atom holding the value. The value of
v will be an array containing the status (the first element) and the supplied value (the second element).
To read the value of the Promise the reified instance also implements
clojure.lang.IBlockingDeref. This offers the possibility to read the promise by using
deref or the
@ reader macro. This works exactly like the original Clojure promise and to be honest the source is also very similar …
Additionally there is the possibility to pass some listener functions which will be invoked with the value (or cause) when the promise is written. This will be used to the Future realization later on.
If you are still curious have a look at the next post.