FutureHelper is a utility library that with a collection of static library methods to help with asynchronous programming using the Java language Promise API (CompletableFuture
/CompletionStage
) and Vert.x asynchronous programming APIs.
Facilities include:
- Bridging between Java 8's
CompletableFuture
and Vert.xAsyncResult
type handlers. - Missing workflows for Java 8's
CompletableFuture
. - Missing workflows for Vert.x 4
Promise
. - Helpers to manage promises (both Java and Vert.x) in Java 8's streams.
- Simplified timer setup and invocation using Java 8's
Timer
class.
FutureHelper is accessible using JitPack. To use it, in your pom.xml
file add the JitPack repository:
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>
Then add the dependency with the version you want to use:
<dependency>
<groupId>com.github.cloudonix</groupId>
<artifactId>java-future-helper</artifactId>
<version>3.0.6</version>
</dependency>
All of the methods are static methods in the class named Futures
. [Note: the information in this section may be out of date. Consult the Javadoc for the full details]
These helpers help to generate CompletableFuture
instances that are already completed synchronously. This is useful to propagate synchronous failures from an async chain and in a few other cases. Newer Java releases also offer similar helpers, but even working with newer Java versions you might still be intersted in using these less verbose helpers:
Create a CompletableFuture<T>
(for any required T
) that has already "completed exceptionally" with the specified error.
Create a CompletableFuture<Void>
that has already completed with the Void
value.
Create a CompletableFuture<T>
that has already completed with the specified value.
Create a CompletableFuture<T>
that will complete in the future with the value returned by the specified supplier, or complete exceptionally with the Throwable
thrown from the supplier. The supplier will be run in the default async executor (the common fork-join pool).
The ThrowingSupplier
interface is a functional interface that behaves like a Java 8 Supplier
but allows the get()
method to throw any exception so it is easier to propagate failures.
The missing "catch" syntax for Java 8 asynchronous completions - this helper that is intended to be used in CompletableFuture.exceptionally()
async error handlers to easily capture and handle (recover from) known exception types - exactly as the throw...catch
syntax is used in synchronous Java to capture specific errors while letting other errors propagate easily.
The parameters passed would be the type of exception to capture and a handler that receives the captured exception (if such an exception type is thrown) and is expected to recover with the value type the completion stage is expected to return. The handler can throw any exception to signal that it wants to propagate another (or the same) error up the stack (or down the chain).
An example of using this functionality to return a default value from a supplier that failed to find a value using an IO-bound data access might look like this:
public String readValue(String defValue) {
return tryToGetValue()
.exceptionally(Futures.on(DataAccessException.class, e -> defValue));
}
An example of how to convert a mechanic error thrown by an underlying library to a logical exception might look like this:
public String readValue() {
return tryToGetValue()
.exceptionally(Futures.on(DataAccessException.class, e -> {
throw new FailedToRetrieveValue("Data access failed", e);
});
}
Allows to add a delay in the middle of an asynchronous chain of completions. This method generates a function that can be used in CompletableFuture.thenCompose()
to forward a value from one completion to another, inducing a specified delay (in milliseconds).
Example usage:
api.createSomeResource()
// give the resource some time to complete initialization
.thenCompose(Futures.delay(500))
.thenCompose(resource -> api.useResource(resource));
Wrapper for CompletableFuture.anyOf()
that takes a stream instead of array.
Wrapper for CompletableFuture.allOf()
that takes a stream instead of an array.
Wrapper for CompletableFuture.allOf()
that takes a list instead of an array.
A re-definition of CompletableFuture.anyOf()
that operates on a stream, with slightly better semantics:
- The returned (typed) promise will resolve with the value of the first promise that completes successfully, if any, in chronological order.
- If any (but not all) promises fail (complete exceptionally), they will be ignored.
- If all promises fail, the last failure (chronologically) will be used to fail the returned promise.
- If the stream has no elements, then the return promise will fail with a
NoSuchElement
exception.
The same as the above, but takes a list instead of a stream.
A re-definition of CompletableFuture.allOf()
that operats on a stream and returns a promise (completion stage) whose value is the list of the results (in order) of all the completions of promises in the stream. Like CompletableFuture.allOf()
, if any of the promises fail to complete (completes exceptionally), the returned promise will fail (complete exceptionally) with the first exception encountered (chronologically, not in order).
The same as the above, but takes a list instead of a stream.
The same as the above, but takes an array instead of a stream or a list.
Operate on a list of values that will be each submitted to the async function to perform some asynchrnonous operation and the returned promise will complete when all operations have completed. This method is essentially a wrapper on top of CompletableFuture.allOf()
so its behavior in the face of failures is the same as that.
Similar to Futures.executeAllAsync()
, this method takes a list of values and an async function and feeds the list to the operation - but additionally this method returns a promise of a list of all the results of the operations in the order they were submitted.
This method generates a Collector
that can be used with Stream.collect()
to convert a stream of promises (CompletableFuture
) to a promise for a list of values returned from all of the successfully resolved promises. This method uses Futures.resolveAll()
internally and has the same semantics in the face of failures - i.e. the returned CompletableFuture
will complete exceptionally with the semantics specified for Futures.resolveAll()
.
Example usage:
IntStream.range(0, 10).mapToObj(dao::findNameById).collect(Futures.resolvingCollector())
.thenAccept(listOfNames -> ...)
This method generates a Collector
, similar to Futures.resolvingCollector()
, but resolving directly to a stream of values. This collector assumes that no promise in the incoming stream will fail, and if any do fail - trying to access the value of the failed promise will result in a CompletionException
being thrown with the original failure set as its cause()
. Note that the stream API does not allow one to recover other elements of a stream that has such failed.
These methods help to interact with Vert.x asynchronous methods that take a callback in the form of Handler<AsyncResult>
to integrate with code that uses CompletableFuture
chaining.
Wrap a Vert.x asynchronous call that takes a callback and convert it to a Completablefuture.
Due to defficiencies in Java generic resolution, code that uses fromAsync()
often has to specify the generic type. For example, getting a Lock
from Vert.x shared data API might look like this:
Futures.<Lock>fromAsync(cb -> Vertx.vertx().sharedData()
.getLock("lock", cb)).thenCompose(v -> ...);
Futures.retryAsyncIf(Consumer<Handler<AsyncResult<T>>> action, Predicate<Throwable> predicate, int tries)
Similar to the simple Futures.fromAsync()
, this method wraps a Vert.x asynchronous callback API but offers additional logic to retry the operation a few times if it fails.
In addition to the lambda, this method takes a predicate that can test exceptions and a retry limit. If the wrapped call fails, the predicate will be tested with the exception and if it returns true
- the operation will be retries again, as many times as specified in the tries
parameter.
Similar to Futures.fromAsync()
but meant to work with Vert.x APIs that need just a Handler<T>
callback (i.e. they cannot fail).
This method generates a function that can forward completions (whether successful or exceptional) from CompletableFuture.handle()
to Vert.x Future.handle()
for cases when you want to propagate the results from a CompletableFuture
chain back into a Vert.x set of callbacks implemented with Future
.
Example usage (taken from Vert.x core documentations):
FileSystem fs = vertx.fileSystem();
Future<Void> vertxFuture = fs
.compose(data -> {
// When data is available, write it to the file
return fs.writeFile("/foo", data);
})
.compose(v -> {
// When the file is written (fut2), execute this:
return fs.move("/foo", "/bar");
});
completableFutureAPI.readDataBuffer().handle(Futures.forward(vertxFuture::handle));
The Promises
method library is currently a playground for implementing workflows around Vert.x Promise
/ Future
APIs. It currently includes at least the following:
The above methods consume an array/list/stream of promises and resolve all of them to a single promise that will resolve to a list of results. The basic premise is that on success you'd get a list (in order) of all the resolutions of all the promises provided as input, while on any failure, the resulting promise will reject with the first failure.
Similar to the resolveAll()
methods, the waitForAll()
method produces a promise that will resolve - to null
- when all the input promises resolve, and will reject if any of the input promises reject - but it always resolves to a Void null
and therefore doesn't care about the input types - so the developer can mix inputs with different types and just wait for all of them to finish.
The combine()
method is a helper to implement the Java 8 CompletableFuture.thenCombine()
workflow where two promises - of likely different types - are resolved and fed into a mapper that can process the two different values and return a third. Unlike the the Java 8 API, here the mapper is expected to be asynchronous and return a Future
- both because we expect this to be more useful and it is also more idiomatic to handle errors (by returning a failedFuture()
) rather than throwing an unchecked exception.
The either()
method is a helper to implement the Java 8 CompletableFuture.thanEither()
workflow where two promises - of the same type - are being resolved and the first that succeeds is fed into a mapper to process it. Unlike the Java 8 API, here the mapper is expected to be asynchronous and return a Future
- both because we expect this to be more useful and it is also more idiomatic to handle errors (by returning a failedFuture()
) rather than throwing an unchecked exception. Another improvement over the Java 8 API is that this method is idempotent to whether either of the provided promises reject - if any one rejects, regardless of which, the one value that is resolved is provided to the mapper, while if both reject - the mapper will not be called and the returned promise will reject with the error of the first promise that rejected.
The recover()
method helps write more idiomatic code when using Future.otherwise()
to recover a Future
that can fail with multiple exception types. The result from using this method should look relatively more readable to developer familiar with the Java syntax try { ... } catch (AException a) { ... } catch (BException b) { ... }
, than recovery code that uses instanceof
to test for exception types.
E.g. instead of writing this:
possiblyFailingOp().otherwise(t -> {
if (t instanceof DataAccessException)
return valueIncaseOfDataError;
if (t instanceof IOException)
return valueIncaseOfIOError;
throw new RuntimeException(t); // rethrow unexpected error to be handled by onFailure handlers
})
.onSuccess(...).onFailure(...);
You should write:
possiblyFailingOp()
.otherwise(Promises.receover(DataAccessException.class, dae -> valueIncaseOfDataError))
.otherwise(Promises.receover(IOException.class, ioe -> valueIncaseOfIOError))
// no need to explicitly rethrow unhandled exceptions
.onSuccess(...).onFailure(...);
Allows to add a delay in the middle of an asynchronous chain of promises. This method generates a function that can be used in Future.compose()
to forward a value from one completion to another, inducing a specified delay (in milliseconds).
Example usage:
api.createSomeResource()
// give the resource some time to complete initialization
.compose(Futures.delay(500))
.compose(resource -> api.useResource(resource));
The onSuccess()
and onFailure()
methods are a reimplementation of Vert.x Future.onSuccess()
and Future.onFailure()
in a way that is safe in the face of exceptions thrown from the handler - when the handler throws an exception, the Future
internal methods will cause that exception to thrown into the context that executes the handler instead of propagated down the handler chain. These implementations are based on the newer Future.andThen(Handler)
method (from Vert.x 4.3.3), which propagates handler failures to the returned Future
instance.
Example usage:
import static io.cloudonix.lib.Promises.*;
// ...
getSomePromise()
.andThen(onSuccess(val -> handleValAndMaybeThrow(val)))
.andThen(onFailure(t -> reportFailure(t)));
See Vert.x issue 4455 for more details.
All of the methods are static methods in the class named Timers
.
All of the methods in the Timers
class execute their scheduled tasks on a single Timer instance named "cxlib-timer"
. This means that they will all run in a single thread so any operation performed must be short in order to not delay other operations - if a long running operation needs to be scheduled, instead schedule a call to start it on another thread, for example by running it in a CompletableFuture.*Async()
method that executes on the common fork-join pool. The timer uses a "daemon thread" - i.e. it doesn't need to be explicitly shutdown and will not prevent the JVM from terminating.
Please note that there are currently no APIs to cancel schedule or recurring tasks.
Schedule the specified operation to be executed after the specified delay in milliseconds.
Schedule the specified operation to be executed after the specified delay in the specified time unit.
Schedule the specified operation to be executed every day at midnight of UTC.
Schedule the specified operation to be executed every day at the specified UTC time.
Schedule the specified operation to be executed every day at the specified time in the specified time zone.
Schedule the specified operation to be executed recurrently, with the first ocurrence hapenning after the specified delay.
For example, to schedule an occurrence every hour from now on:
Timers.setPeriodicOperation(() -> System.out.println("Hello"), 0, TimeUnit.HOURS.toMillis(1));
All the Timers
methods return an instance of Timers.Cancellable
that can be used to cancel the scheduled task - if it had not already finished executing. For periodic operations, the Timers.Cancellable
can be used to prevent future executions even after the operation have already been executed one or more times.
As this library is composed of static methods, it is also possible to stream-line some references by statically importing them, or all references:
import static io.cloudonix.lib.Futures.*;
class MyClass(){
protected CompletableFuture<Void> promise() {
return completedFuture();
}
}
The following behavior configuration options are available by setting the specific Java system properties:
io.cloudonix.lib.futures.async_callsite_snapshots
- when usingfromAsync()
to convert Vert.x async callbacks toCompletableFuture
, in case of a failure the Java 8 runtime encodes its internal exception encoding mechanism's stack trace into the generatedCompletionException
class. By setting this property totrue
, the library will capture thefromAsync()
call site and in case of a failure, will encode the original call site stack trace into Java'sCompletionException
class. This should allow easier debugging of failed async operations. This adds a non-trivial computational cost to every call tofromAsync()
(even those that will not fail) which may be considered expensive depending on your specific scenario.