public interface ExecControl
import ratpack.exec.ExecControl;
import ratpack.exec.Promise;
import ratpack.test.UnitTest;
import ratpack.test.handling.HandlingResult;
public class Example {
public static class AsyncUpperCaseService {
private final ExecControl control;
public AsyncUpperCaseService(ExecControl control) {
this.control = control;
}
public Promise<String> toUpper(final String lower) {
return control.promise(f -> f.success(lower.toUpperCase()));
}
}
public static void main(String[] args) throws Exception {
HandlingResult result = UnitTest.requestFixture().handleChain(chain -> {
ExecControl control = chain.getLaunchConfig().getExecController().getControl();
AsyncUpperCaseService service = new AsyncUpperCaseService(control);
chain.get(ctx -> service.toUpper("foo").then(ctx::render));
});
assert result.rendered(String.class).equals("FOO");
}
}
Note: when using the Guice integration, the exec control is made available for injection.
Modifier and Type | Method and Description |
---|---|
void |
addInterceptor(ExecInterceptor execInterceptor,
Action<? super Execution> continuation)
Adds an interceptor that wraps the rest of the current execution segment and all future segments of this execution.
|
<T> Promise<T> |
blocking(Callable<T> blockingOperation)
Performs a blocking operation on a separate thread, returning a promise for its value.
|
void |
fork(Action<? super Execution> action)
Forks a new execution on a separate thread.
|
void |
fork(Action<? super Execution> action,
Action<? super Throwable> onError) |
void |
fork(Action<? super Execution> action,
Action<? super Throwable> onError,
Action<? super Execution> onComplete) |
ExecController |
getController() |
Execution |
getExecution() |
<T> Promise<T> |
promise(Action<? super Fulfiller<T>> action)
Creates a promise for an asynchronously created value.
|
<T> void |
stream(org.reactivestreams.Publisher<T> publisher,
org.reactivestreams.Subscriber<? super T> subscriber)
Process streams of data asynchronously with non-blocking back pressure.
|
Execution getExecution()
ExecController getController()
void addInterceptor(ExecInterceptor execInterceptor, Action<? super Execution> continuation) throws Exception
The given action is executed immediately (i.e. as opposed to being queued to be executed as the next execution segment). Any code executed after a call to this method in the same execution segment WILL NOT be intercepted. Therefore, it is advisable to not execute any code after calling this method in a given execution segment.
See ExecInterceptor
for example use of an interceptor.
execInterceptor
- the execution interceptor to addcontinuation
- the rest of the code to be executedException
- any thrown by continuation
ExecInterceptor
<T> Promise<T> blocking(Callable<T> blockingOperation)
This method should be used to perform blocking IO, or to perform any operation that synchronously waits for something to happen.
The given blockingOperation
will be performed on a thread from a special thread pool for such operations
(i.e. not a thread from the main compute event loop).
The operation should do as little computation as possible. It should just perform the blocking operation and immediately return the result. Performing computation during the operation will degrade performance.
This method is just a specialization of promise(ratpack.func.Action<? super ratpack.exec.Fulfiller<T>>)
, and shares all of the same semantics with regard to
execution binding and execution-on-promise-subscription.
import ratpack.launch.LaunchConfigBuilder; import ratpack.func.Action; import ratpack.exec.Execution; import ratpack.exec.ExecController; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; public class Example { public static void main(String[] args) throws InterruptedException { ExecController controller = LaunchConfigBuilder.noBaseDir().build().getExecController(); final CountDownLatch latch = new CountDownLatch(1); controller.getControl().fork(new Action<Execution>() { public void execute(Execution execution) { execution .getControl() .blocking(new Callable<String>() { public String call() { // perform a blocking op, e.g. a database call, filesystem read etc. return "foo"; } }) .then(new Action<String>() { public void execute(String string) { // do something with the value that was obtained from a blocking operation latch.countDown(); } }); } }); latch.await(); } }
T
- the type of value created by the operationblockingOperation
- the operation that blocks<T> Promise<T> promise(Action<? super Fulfiller<T>> action)
This method can be used to integrate with APIs that produce values asynchronously.
The asynchronous API should be invoked during the execute method of the action given to this method.
The result of the asynchronous call is then given to the Fulfiller
that the action is given.
T
- the type of valueaction
- an action that invokes an asynchronous API, forwarding the result to the given fulfiller.Fulfiller
,
Fulfillment
void fork(Action<? super Execution> action)
This is similar to using new Thread().run()
except that the action will be executed
on a Ratpack managed thread, and will use Ratpack's execution semantics.
action
- the initial execution segmentvoid fork(Action<? super Execution> action, Action<? super Throwable> onError, Action<? super Execution> onComplete)
<T> void stream(org.reactivestreams.Publisher<T> publisher, org.reactivestreams.Subscriber<? super T> subscriber)
This method allows the processing of elements (onNext) or termination signals (onError, onComplete) to happen outside of the execution stack of the Publisher. In other words these "events" are executed asynchronously, on a Ratpack managed thread, without blocking the Publisher.
T
- the type of streamed elementspublisher
- the provider of a potentially unbounded number of sequenced elements, publishing them according to the demand
received from its Subscriber(s)subscriber
- a component that accepts a sequenced stream of elements provided by a Publisher