T
- the type of promised valuepublic interface Promise<T>
A promise is a representation of a value which will become available later.
Methods such as map(Function)
, flatMap(Function)
, cache()
etc.) allow a pipeline of “operations” to be specified,
that the value will travel through as it becomes available.
Such operations are implemented via the transform(Function)
method.
Each operation returns a new promise object, not the original promise object.
To create a promise, use the ExecControl.promise(Action)
method (or one of the variants such as ExecControl.blocking(Callable)
.
To test code that uses promises, use the ExecHarness
.
The promise is not “activated” until the then(Action)
method is called.
This method terminates the pipeline, and receives the final value.
Promise objects are multi use.
Every promise pipeline has a value producing function at its start.
Activating a promise (i.e. calling then(Action)
) invokes the function.
The cache()
operation can be used to change this behaviour.
Modifier and Type | Method and Description |
---|---|
default <O> Promise<O> |
apply(Function<? super Promise<T>,? extends Promise<O>> function)
Applies the custom operation function to this promise.
|
T |
block()
Blocks execution waiting for this promise to complete and returns the promised value.
|
default <O> Promise<O> |
blockingMap(Function<? super T,? extends O> transformer)
Like
map(Function) , but performs the transformation on a blocking thread. |
default Promise<T> |
cache()
Caches the promised value (or error) and returns it to all subscribers.
|
default Promise<T> |
defer(Action<? super Runnable> releaser)
Allows the execution of the promise to be deferred to a later time.
|
default <O> Promise<O> |
flatMap(Function<? super T,? extends Promise<O>> transformer)
Transforms the promised value by applying the given function to it that returns a promise for the transformed value.
|
default <O> Promise<Pair<O,T>> |
left(Promise<O> left) |
default <O> Promise<O> |
map(Function<? super T,? extends O> transformer)
Transforms the promised value by applying the given function to it.
|
default Promise<T> |
mapError(Function<? super Throwable,? extends T> transformer)
Transforms the promise failure (potentially into a value) by applying the given function to it.
|
default <O> Promise<O> |
next(Promise<O> next) |
default Promise<T> |
onError(Action<? super Throwable> errorHandler)
Specifies the action to take if the an error occurs trying to produce the promised value.
|
default Promise<T> |
onNull(Block action)
A convenience shorthand for
routing null values. |
default Promise<T> |
onYield(Runnable onYield)
Registers a listener that is invoked when
this promise is initiated. |
default Operation |
operation() |
default Operation |
operation(Action<? super T> action) |
default void |
result(Action<? super Result<T>> resultHandler)
Consume the promised value as a
Result . |
default <O> Promise<Pair<T,O>> |
right(Promise<O> right) |
default Promise<T> |
route(Predicate<? super T> predicate,
Action<? super T> action)
Allows the promised value to be handled specially if it meets the given predicate, instead of being handled by the promise subscriber.
|
void |
then(Action<? super T> then)
Specifies what should be done with the promised object when it becomes available.
|
default Promise<T> |
throttled(Throttle throttle)
Throttles
this promise, using the given throttle . |
default <O> O |
to(Function<? super Promise<T>,? extends O> function)
Applies the given function to
this and returns the result. |
<O> Promise<O> |
transform(Function<? super Upstream<? extends T>,? extends Upstream<O>> upstreamTransformer)
Apply a custom transform to this promise.
|
default Promise<T> |
wiretap(Action<? super Result<T>> listener)
Registers a listener for the promise outcome.
|
void then(Action<? super T> then)
Important: this method can only be used from a Ratpack managed compute thread.
If it is called on a non Ratpack managed compute thread it will immediately throw an ExecutionException
.
then
- the receiver of the promised valueExecutionException
- if not called on a Ratpack managed compute thread<O> Promise<O> transform(Function<? super Upstream<? extends T>,? extends Upstream<O>> upstreamTransformer)
This method is the basis for the standard operations of this interface, such as map(Function)
.
The following is a non generic implementation of a map that converts the value to upper case.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
c.promiseOf("foo")
.transform(up -> down ->
up.connect(down.<String>onSuccess(value -> {
try {
down.success(value.toUpperCase());
} catch (Throwable e) {
down.error(e);
}
}))
)
);
assertEquals("FOO", result.getValue());
}
}
The “upstreamTransformer” function takes an upstream data source, and returns another upstream that wraps it.
It is typical for the returned upstream to invoke the Upstream.connect(Downstream)
method of the given upstream during its connect method.
For more examples of transform implementations, please see the implementations of the methods of this interface.
O
- the type of item emitted by the transformed upstreamupstreamTransformer
- a function that returns a new upstream, typically wrapping the given upstream argumentT block() throws Exception
This method allows the use of asynchronous API, by synchronous API. This may occur when integrating with other libraries that are not asynchronous. The following example simulates using a library that takes a callback that is expected to produce a value synchronously, but where the production of the value is actually asynchronous.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.func.Factory;
import static org.junit.Assert.assertEquals;
public class Example {
static <T> T produceSync(Factory<? extends T> factory) throws Exception {
return factory.create();
}
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(e ->
e.blocking(() ->
produceSync(() ->
e.promiseOf("foo").block() // block and wait for the promised value
)
)
);
assertEquals("foo", result.getValue());
}
}
Important: this method can only be used inside a ExecControl.blocking(Callable)
or blockingMap(Function)
function.
That is, it can only be used from a Ratpack managed blocking thread.
If it is called on a non Ratpack managed blocking thread it will immediately throw an ExecutionException
.
When this method is called, the promise will be subscribed to on a compute thread while the blocking thread waits.
When the promised value has been produced, and the compute thread segment has completed, the value will be returned
allowing execution to continue on the blocking thread.
The following example visualises this flow by capturing the sequence of events via an ExecInterceptor
.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import ratpack.exec.ExecInterceptor;
import java.util.List;
import java.util.ArrayList;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = new ArrayList<>();
ExecHarness.yieldSingle(
r -> r.add(ExecInterceptor.class, (execution, execType, continuation) -> {
events.add(execType + "-start");
try {
continuation.execute();
} finally {
events.add(execType + "-stop");
}
}),
e -> e.blocking(() -> e.promiseOf("foo").block())
);
List<String> actualEvents = Arrays.asList(
"COMPUTE-start",
"COMPUTE-stop",
"BLOCKING-start",
"COMPUTE-start",
"COMPUTE-stop",
"BLOCKING-stop",
"COMPUTE-start",
"COMPUTE-stop"
);
assertEquals(actualEvents, events);
}
}
ExecutionException
- if not called on a Ratpack managed blocking threadException
- any thrown while producing the valuedefault Promise<T> onError(Action<? super Throwable> errorHandler)
errorHandler
- the action to take if an error occursdefault void result(Action<? super Result<T>> resultHandler)
Result
.
This method is an alternative to then(Action)
and onError(Action)
.
resultHandler
- the consumer of the resultdefault <O> Promise<O> map(Function<? super T,? extends O> transformer)
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
c.blocking(() -> "foo")
.map(String::toUpperCase)
.map(s -> s + "-BAR")
);
assertEquals("FOO-BAR", result.getValue());
}
}
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised valuedefault Operation operation()
default Promise<T> mapError(Function<? super Throwable,? extends T> transformer)
If the function returns a value, the promise will now be considered successful.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
c.<String>failedPromise(new Exception("!"))
.mapError(e -> "value")
);
assertEquals("value", result.getValue());
}
}
If the function throws an exception, that exception will now represent the promise failure.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
c.<String>failedPromise(new Exception("!"))
.mapError(e -> { throw new RuntimeException("mapped", e); })
);
assertEquals("mapped", result.getThrowable().getMessage());
}
}
The function will not be called if the promise is successful.
transformer
- the transformation to apply to the promise failuredefault <O> Promise<O> apply(Function<? super Promise<T>,? extends Promise<O>> function)
This method can be used to apply custom operations without breaking the “code flow”. It works particularly well with method references.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Integer value = ExecHarness.yieldSingle(e ->
e.blocking(() -> 1)
.apply(Example::dubble)
.apply(Example::triple)
).getValue();
assertEquals(Integer.valueOf(6), value);
}
public static Promise<Integer> dubble(Promise<Integer> input) {
return input.map(i -> i * 2);
}
public static Promise<Integer> triple(Promise<Integer> input) {
return input.map(i -> i * 3);
}
}
If the apply function throws an exception, the returned promise will fail.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(e ->
e.blocking(() -> 1)
.apply(Example::explode)
).getThrowable();
assertEquals("bang!", error.getMessage());
}
public static Promise<Integer> explode(Promise<Integer> input) throws Exception {
throw new Exception("bang!");
}
}
If the promise having the operation applied to fails, the operation will not be applied.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
Throwable error = ExecHarness.yieldSingle(e ->
e.<Integer>blocking(() -> { throw new Exception("bang!"); })
.apply(Example::dubble)
).getThrowable();
assertEquals("bang!", error.getMessage());
}
public static Promise<Integer> dubble(Promise<Integer> input) {
return input.map(i -> i * 2);
}
}
O
- the type of promised object after the operationfunction
- the operation implementationdefault <O> O to(Function<? super Promise<T>,? extends O> function) throws Exception
this
and returns the result.
This method can be useful when needing to convert a promise to another type as it facilitates doing so without breaking the “code flow”. For example, this can be used when integrating with RxJava.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
private static final List<String> LOG = new LinkedList<>();
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
e.blocking(() -> "foo")
.to(RxRatpack::observe)
.doOnNext(i -> LOG.add("doOnNext"))
.subscribe(LOG::add)
);
assertEquals(Arrays.asList("doOnNext", "foo"), LOG);
}
}
The given function is executed immediately.
This method should only be used when converting a promise to another type.
See apply(Function)
for applying custom promise operators.
O
- the type the promise will be converted tofunction
- the promise conversion functionException
- any thrown by the given functiondefault <O> Promise<O> blockingMap(Function<? super T,? extends O> transformer)
map(Function)
, but performs the transformation on a blocking thread.
This is simply a more convenient form of using ExecControl.blocking(java.util.concurrent.Callable)
and flatMap(Function)
.
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised value, on a blocking threaddefault <O> Promise<O> flatMap(Function<? super T,? extends Promise<O>> transformer)
This is useful when the transformation involves an asynchronous operation.
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String[] args) throws Exception {
ExecResult<String> result = ExecHarness.yieldSingle(c ->
c.blocking(() -> "foo")
.flatMap(s -> c.blocking(s::toUpperCase))
.map(s -> s + "-BAR")
);
assertEquals("FOO-BAR", result.getValue());
}
}
In the above example, flatMap()
is being used because the transformation requires a blocking operation (it doesn't really in this case, but that's what the example is showing).
In this case, it would be more convenient to use blockingMap(Function)
.
O
- the type of the transformed objecttransformer
- the transformation to apply to the promised valueblockingMap(Function)
default Promise<T> route(Predicate<? super T> predicate, Action<? super T> action)
This is typically used for validating values, centrally.
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static ExecResult<Integer> yield(int i, List<Integer> collector) throws Exception {
return ExecHarness.yieldSingle(c ->
c.<Integer>promise(f -> f.success(i))
.route(v -> v > 5, collector::add)
);
}
public static void main(String... args) throws Exception {
List<Integer> routed = Lists.newLinkedList();
ExecResult<Integer> result1 = yield(1, routed);
assertEquals(new Integer(1), result1.getValue());
assertFalse(result1.isComplete()); // false because promise returned a value before the execution completed
assertTrue(routed.isEmpty());
ExecResult<Integer> result10 = yield(10, routed);
assertNull(result10.getValue());
assertTrue(result10.isComplete()); // true because the execution completed before the promised value was returned (i.e. it was routed)
assertTrue(routed.contains(10));
}
}
Be careful about using this where the eventual promise subscriber is unlikely to know that the promise will routed as it can be surprising when neither the promised value nor an error appears.
It can be useful at the handler layer to provide common validation.
import ratpack.exec.Promise;
import ratpack.handling.Context;
import ratpack.test.embed.EmbeddedApp;
import static org.junit.Assert.assertEquals;
public class Example {
public static Promise<Integer> getAge(Context ctx) {
return ctx
.blocking(() -> 10) // e.g. fetch value from DB
.route(
i -> i < 21,
i -> ctx.render(i + " is too young to be here!")
);
}
public static void main(String... args) throws Exception {
EmbeddedApp.fromHandler(ctx ->
getAge(ctx).then(age -> ctx.render("welcome!"))
).test(httpClient -> {
assertEquals("10 is too young to be here!", httpClient.getText());
});
}
}
If the routed-to action throws an exception, it will be forwarded down the promise chain.
predicate
- the condition under which the value should be routedaction
- the terminal action for the valuedefault Promise<T> onNull(Block action)
routing
null
values.
If the promised value is null
, the given action will be called.
action
- the action to route to if the promised value is nulldefault Promise<T> cache()
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
ExecHarness.runSingle(c -> {
AtomicLong counter = new AtomicLong();
Promise<Long> uncached = c.promise(f -> f.success(counter.getAndIncrement()));
uncached.then(i -> assertEquals(0l, i.longValue()));
uncached.then(i -> assertEquals(1l, i.longValue()));
uncached.then(i -> assertEquals(2l, i.longValue()));
Promise<Long> cached = uncached.cache();
cached.then(i -> assertEquals(3l, i.longValue()));
cached.then(i -> assertEquals(3l, i.longValue()));
uncached.then(i -> assertEquals(4l, i.longValue()));
cached.then(i -> assertEquals(3l, i.longValue()));
});
}
}
If the cached promise fails, the same exception will be returned every time.
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertTrue;
public class Example {
public static void main(String... args) throws Exception {
ExecHarness.runSingle(c -> {
Throwable error = new Exception("bang!");
Promise<Object> cached = c.promise(f -> f.error(error)).cache();
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
cached.onError(t -> assertTrue(t == error)).then(i -> assertTrue("not called", false));
});
}
}
default Promise<T> defer(Action<? super Runnable> releaser)
When the returned promise is subscribed to, the given releaser
action will be invoked.
The execution of this
promise is deferred until the runnable given to the releaser
is run.
It is generally more convenient to use throttled(Throttle)
or onYield(Runnable)
than this operation.
releaser
- the action that will initiate the execution some time laterdefault Promise<T> onYield(Runnable onYield)
this
promise is initiated.
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
c.<String>promise(f -> {
events.add("promise");
f.success("foo");
})
.onYield(() -> events.add("onYield"))
.then(v -> events.add("then"))
);
assertEquals(Arrays.asList("onYield", "promise", "then"), events);
}
}
onYield
- the action to take when the promise is initiatedthis
promisedefault Promise<T> wiretap(Action<? super Result<T>> listener)
import com.google.common.collect.Lists;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
List<String> events = Lists.newLinkedList();
ExecHarness.runSingle(c ->
c.<String>promise(f -> {
events.add("promise");
f.success("foo");
})
.wiretap(r -> events.add("wiretap: " + r.getValue()))
.then(v -> events.add("then"))
);
assertEquals(Arrays.asList("promise", "wiretap: foo", "then"), events);
}
}
listener
- the result listenerthis
promisedefault Promise<T> throttled(Throttle throttle)
this
promise, using the given throttle
.
Throttling can be used to limit concurrency. Typically to limit concurrent use of an external resource, such as a HTTP API.
Note that the Throttle
instance given defines the actual throttling semantics.
import ratpack.exec.Throttle;
import ratpack.test.exec.ExecHarness;
import ratpack.exec.ExecResult;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertTrue;
public class Example {
public static void main(String... args) throws Exception {
int numJobs = 1000;
int maxAtOnce = 10;
ExecResult<Integer> result = ExecHarness.yieldSingle(exec -> {
AtomicInteger maxConcurrent = new AtomicInteger();
AtomicInteger active = new AtomicInteger();
AtomicInteger done = new AtomicInteger();
Throttle throttle = Throttle.ofSize(maxAtOnce);
// Launch numJobs forked executions, and return the maximum number that were executing at any given time
return exec.promise(outerFulfiller -> {
for (int i = 0; i < numJobs; i++) {
exec.fork().start(forkedExec ->
forkedExec.<Integer>promise(innerFulfiller -> {
int activeNow = active.incrementAndGet();
int maxConcurrentVal = maxConcurrent.updateAndGet(m -> Math.max(m, activeNow));
active.decrementAndGet();
innerFulfiller.success(maxConcurrentVal);
})
.throttled(throttle) // limit concurrency
.then(max -> {
if (done.incrementAndGet() == numJobs) {
outerFulfiller.success(max);
}
})
);
}
});
});
assertTrue(result.getValue() <= maxAtOnce);
}
}
throttle
- the particular throttle to use to throttle the operation