Chaining async calls using Java Futures

  • By Tech Week
  • May. 16, 2019

by Tudor MIHORDEA and Ovidiu LUPU

 

Introduction to Asynchronous Programming

The JVM was designed with 1:1 multithreading, meaning that logical threads map directly to physical threads, so blocking a thread to wait for hardware I/O (filesystem ops, network calls) to finish is not an option. This problem led to the development of asynchronous libraries like NIO which only launch an operation on the current thread and avoid blocking, relying on events or callbacks instead to allow the developer to do something when it finishes. Asynchronous programming means working with this kind of libraries and with UI elements, which work on a similar principle.

 

Previous Approaches

Writing asynchronous code is always a bit tricky. In the past we used to rely on making asynchronous calls and providing a callback to handle the reply. This approach worked fine until we had to start combining multiple asynchronous calls and to handle error scenarios. This got to be known as callback hell (a term that I believe appeared in the JS world). One of the problems with callbacks are that the callback style encourages using functions without a result (which return void), and this makes it very hard to apply a chain of transformations after I/O calls, which usually return some data you want to process. Even dealing with independent callbacks is not an easy problem because a lot of time can be wasted trying to figure out what happens with the other callback functions if an exception was thrown inside one of them. However, the biggest problem with callbacks is reasoning about the state of the program when a lot of callbacks with no return type each change the program state. When the order in which they get called is not clear, understanding, debugging and unit testing the code can be quite troublesome.

 

Futures

Now we can see what Futures are and how they try to deal with these problems. A Future (also called promise, task or deferred depending on the programming language) is a proxy to an asynchronous computation which returns a result of a specific type. This approach of making Future a type means that an asynchronous computation is a first class object, which can be passed to other functions and received as a result and this makes imposing an order on callbacks a solvable problem. The result type also makes chains of transformations easily doable via functions like map, which allow you to transform the result into a result of another type while using a strongly typed and unit testable function. Saving the best for last, the biggest advantage of using Futures is composability. You might imagine that dealing with transformations which are themselves asynchronous means having to somehow extract your result from a mess that looks like Future<Future<…<Future<T>>…>>. The existence of methods like thenCompose means that any sequence of asynchronous operations will be handled like one asynchronous operation in the rest of your program and this what makes reasoning about and working with these operations much easier.

One issue with complex workflows is that you might have a mixture of CPU and I/O intensive steps. The most common way to solve this problem is to use two thread pools, one with a large number of threads for I/O and another one with one thread per physical core for CPU intensive tasks. This means that in addition to specifying the order of the steps, we also have to specify where should each function be executed. Future also helps us with this problem, because the supplyAsync method also allows fine grained control of where each function should be executed.

 

Example

I will use a scenario that I encountered lately to showcase the usage of futures. I needed to retrieve some statistics from a bunch of data. The problem was that the data was not directly accessible for me (it was on a different data center). To get the needed data I had to use an existing data replication API to get the data into the local data center. This api required JWT authentication. To get a JWT token I had to make another call to an authentication server. Here is how the whole thing was supposed to work:

  1.     Call the authentication server to get an authentication token. This call was authenticated with kerberos (the data replication API did not have a kerberos option) and returns a JWT token
  2.     Call the data replication API to request the needed data to be copied. Here we had to send the token received in step 1. The API returns a request id.
  3.     Wait for the data to be copied. An endpoint in the data replication API returns the status of the request. By polling this endpoint we can determine if the data was successfully copied. If  completed, it also returns the path in the local cluster.
  4.     Do the actual processing on the data since now it’s locally available.

 

There might be multiple instances of this data processing workflow that need to be executed in parallel. Moreover, each data copying request might take a different amount of time. For this reason each step will need to be handled asynchronously. Futures were available in Java since version 5 but they you couldn’t combine or chain them which made them less useful. This issue was fixed with the introduction of CompletableFuture in Java 8 and this is what we will use in our example. For the http calls we can use any of the existing Asynchronous http request libraries. Here are the definitions for the 4 steps:

 

public CompletableFuture<String> getTokenAsync();

public CompletableFuture<String> makeDataRequestAsync(String token);

public CompletableFuture<String> waitForDataAsync(String requestId, String token);

public CompletableFuture<Void> processDataAsync(String path);

 

Next, we needed to compose the steps. For this we used the thenCompose method of the CompletableFuture. The result will be a new CompletableFuture:

 

public CompletableFuture<Void> process() {

getTokenAsync(id)

.thenCompose(token -> makeDataRequestAsync(token))

.thenCompose(requestId -> waitForDataAsync(requestId))

.thenCompose(path -> processDataAsync(path));

}

 

Now this code looks neat and it’s self-explanatory. But … I cheated a bit. As you might have noticed I am missing the token in the 3rd step. Let’s add the token to the waitForDataAsync method:

 

public CompletableFuture<Void> process() {

return getTokenAsync()

.thenCompose(token -> makeDataRequestAsync(token)

.thenCompose(requestId -> waitForDataAsync(requestId, token))

)

.thenCompose(path -> processDataAsync(path));

}

 

The code is still quite clean, but I had to add a new level of imbrication. This was required since in step 3, besides the requestId, I also needed the token returned from the first step.

To go one step forward with Future chaining let’s see how we can do the polling on the status endpoint of the data replication API:

 

public CompletableFuture<Status> getRequestStatus(String requestId, String token);

public CompletableFuture<String> waitForDataAsync(String requestId, String token) {

return getRequestStatus(requestId, token).thenCompose(status -> {

if (!status.isDone()) {

//add wait here

return waitForDataAsync(requestId, token);

} else {

return CompletableFuture.completedFuture(status.getPath());

}

});

}

 

Again, using Futures makes the code very clear: Get the current status and if we are not ready wait a bit and try again, otherwise return the path where data was copied. One note here, to avoid a stack overflow error the getRequestStatus should actually schedule the request using an Executor.

 

Scala

Scala is another language that runs on the JVM, which in addition to being able to use any Java library offers a more powerful type system and an ecosystem of libraries which encourages functional programming techniques.

Future is also in the Scala standard library, but the functions dealing with collections of futures are typesafe because the type system supports higher-kinded types and this also allows these kinds of abstractions to integrate seamlessly with language constructs like for comprehension. This is how the code from the previous section would look in Scala:

 

for {

token       <- getTokenAsync()

requestId <- makeDataRequestAsync(token)

path         <- waitForDataAsync(requestId, token)

_              <- processDataAsync(path)

} yield ()

 

In addition to the improvements to the Future type, Scala also has other types which model asynchronous operations. One such example is IO, which provides more features like lazy evaluation, resource management and cancellation.

Advanced language features are nice, but usually not worth it if they only help you with the code you write yourself. In real world projects, most of the code you interact with is in the standard library of your language or in third party libraries. The big plus of Scala is that abstractions like Future are not used in a vacuum, instead they are an interface with third party libraries. Integrating a new library into your project is much easier when they all use a common set of abstractions and types, reducing the amount of glue code you need to write, maintain and debug.

 

Conclusions

Futures are worth using for multiple reasons. Not only do they solve many of the problems associated with callbacks, but they are a design pattern, a common abstraction that has made its way into multiple languages. Advantages of learning an abstraction instead of how a specific language solves a problem is that it becomes easier to ramp up on another project that is using a different technology stack.

Another benefit of using higher level libraries is that your programs can get performance optimizations for free due to improvements in the implementation. For example, some implementations of Scala Futures try to use the same executor thread when composing futures instead of submitting the second future to the Executor as a Runnable.

Abstractions like Future should always be viewed as just one tool in your toolbox, not as a silver bullet which solves any asynchronous programming problem you might encounter. If your tasks are computationally but not I/O intensive, applying these techniques will only make things worse. On the other hand, the steps in the workflow described in the example are a variation of the (1 producer, 1 consumer, 1 value) problem template, but there are many other types of problems where different abstractions are more suitable. For example, streams and observables when you have multiple values separated by time, actors when you need bidirectional asynchronous communication and the list can go on.

Read next

Placeholder

More Efficient Trainings for Air Traffic Control P...

Tech Week

20 June 2019, 12:55
Placeholder

The Future of Instant Messaging for Organizations ...

Tech Week

18 June 2019, 07:36
Placeholder

Practical Applications for Automatic License Plate...

Tech Week

18 June 2019, 07:35
Placeholder

Microsoft a fost prezent din nou la Bucharest Tech...

Tech Week

29 May 2019, 13:51
Placeholder

Liability of Robots & Algorithms Special Obli...

Tech Week

26 May 2019, 08:52
Placeholder

How Do We Defend Smart Buildings from Hackers?

Tech Week

22 May 2019, 16:08