public abstract class ReactorRatpack
extends java.lang.Object
The methods of this class provide bi-directional conversion between Ratpack's Promise
and Reactor's Flux
and Mono
.
This allows Ratpack promise based API to be integrated into an RxJava based app and vice versa.
To test observable based services that use Ratpack's execution semantics, use the ExecHarness
and convert the observable back to a promise with promise(Flux)
.
The methods in this class are also provided as Kotlin Extensions.
When using Groovy, each static method in this class is able to act as an instance-level method against the Flux
type.
Modifier and Type | Method and Description |
---|---|
static <T> Flux<T> |
bindExec(Flux<T> source)
Binds the given flux to the current execution, allowing integration of third-party asynchronous fluxes with Ratpack's execution model.
|
static Scheduler |
computationScheduler()
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecController.fork() ). |
static Scheduler |
computationScheduler(ExecController execController)
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecController.fork() ). |
static Flux<java.lang.Void> |
flux(Operation operation)
|
static <T> Flux<T> |
flux(Promise<T> promise)
|
static <T,I extends java.lang.Iterable<T>> |
fluxEach(Promise<I> promise)
|
static <T> Flux<T> |
fork(Flux<T> observable)
Parallelize a flux by forking it's execution onto a different Ratpack compute thread and automatically binding
the result back to the original execution.
|
static <T> Flux<T> |
fork(Flux<T> flux,
Action<? super RegistrySpec> doWithRegistrySpec)
A variant of
fork(reactor.core.publisher.Flux<T>) that allows access to the registry of the forked execution inside an Action . |
static <T> Flux<T> |
forkEach(Flux<T> flux)
Parallelize an observable by creating a new Ratpack execution for each element.
|
static <T> Flux<T> |
forkEach(Flux<T> flux,
Action<? super RegistrySpec> doWithRegistrySpec)
A variant of
forkEach(reactor.core.publisher.Flux<T>) that allows access to the registry of each forked execution inside an Action . |
static void |
initialize()
Registers an
Hooks.onOperatorError(BiFunction) with Reactor that provides a default error handling strategy of forwarding exceptions to the execution error handler. |
static Scheduler |
ioScheduler()
A scheduler that uses the application io thread pool.
|
static Scheduler |
ioScheduler(ExecController execController)
A scheduler that uses the application io thread pool.
|
static <T> Mono<T> |
mono(Promise<T> promise)
|
static <T> Promise<java.util.List<T>> |
promise(Flux<T> flux)
|
static <T> Promise<T> |
promiseSingle(Mono<T> mono)
|
static <T> TransformablePublisher<T> |
publisher(Flux<T> flux)
|
public static void initialize()
Hooks.onOperatorError(BiFunction)
with Reactor that provides a default error handling strategy of forwarding exceptions to the execution error handler.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.error.ServerErrorHandler;
import ratpack.reactor.ReactorRatpack;
import ratpack.test.embed.EmbeddedApp;
import rx.Observable;
import static org.junit.Assert.assertEquals;
<p>
public class Example {
public static void main(String... args) throws Exception {
ReactorRatpack.initialize(); // must be called once for the life of the JVM
<p>
EmbeddedApp.fromHandlers(chain -> chain
.register(s -> s
.add(ServerErrorHandler.class, (ctx, throwable) ->
ctx.render("caught by error handler: " + throwable.getMessage())
)
)
.get(ctx -> Observable.<String>error(new Exception("!")).subscribe(ctx::render))
).test(httpClient ->
assertEquals("caught by error handler: !", httpClient.getText())
);
}
}
public static Flux<java.lang.Void> flux(Operation operation)
Operation
into a Flux
.
The returned flux emits completes upon completion of the operation without emitting a value, and emits the error (i.e. via errer()
) if it fails.
import ratpack.reactor.ReactorRatpack;
import ratpack.exec.Operation;
import ratpack.test.exec.ExecHarness;
<p>
import public static org.junit.Assert.assertTrue;
<p>
public class Example {
public static boolean executed;
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
Operation.of(() -> executed = true)
.to(ReactorRatpack::observe)
.subscribe()
);
<p>
assertTrue(executed);
}
}
operation
- the operationpublic static <T> Flux<T> flux(Promise<T> promise)
Promise
into an Flux
.
The returned observable emits the promise's single value if it succeeds, and emits the error (i.e. via onError()
) if it fails.
This method works well as a method reference to the Promise.to(ratpack.func.Function)
method.
import ratpack.reactor.ReactorRatpack;
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
<p>
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static String value;
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
Promise.value("hello world")
.to(ReactorRatpack::observe)
.map(String::toUpperCase)
.subscribe(s -> value = s)
);
<p>
assertEquals("HELLO WORLD", value);
}
}
T
- the type of value promisedpromise
- the promisepublic static <T,I extends java.lang.Iterable<T>> Flux<T> fluxEach(Promise<I> promise)
Promise
for an iterable into an Flux
.
The promised iterable will be emitted to the observer one element at a time, like Flux.fromIterable(Iterable)
.
import ratpack.reactor.ReactorRatpack;
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
<p>
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
<p>
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static void main(String... args) throws Exception {
final List<String> items = new LinkedList<>();
ExecHarness.runSingle(e ->
Promise.value(Arrays.asList("foo", "bar"))
.to(ReactorRatpack::observeEach)
.subscribe(items::add)
);
<p>
assertEquals(Arrays.asList("foo", "bar"), items);
}
}
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseflux(ratpack.exec.Promise)
public static <T> Mono<T> mono(Promise<T> promise)
Promise
into a Mono
.
The returned Single emits the promise's single value if it succeeds, and emits the error (i.e. via onError()
) if it fails.
This method works well as a method reference to the Promise.to(ratpack.func.Function)
method.
import ratpack.reactor.ReactorRatpack;
import ratpack.exec.Promise;
import ratpack.test.exec.ExecHarness;
<p>
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static String value;
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
Promise.value("hello world")
.to(ReactorRatpack::mono)
.map(String::toUpperCase)
.subscribe(s -> value = s)
);
<p>
assertEquals("HELLO WORLD", value);
}
}
T
- the type of value promisedpromise
- the promisepublic static <T> Promise<java.util.List<T>> promise(Flux<T> flux) throws UnmanagedThreadException
Flux
into a Promise
, for all of the observable's items.
This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution.
import ratpack.reactor.ReactorRatpack;
import ratpack.test.exec.ExecHarness;
import reactor.core.publisher.flux;
import java.util.List;
import java.util.Arrays;
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static class AsyncService {
public <T> Flux<T> flux(final T value) {
return Flux.create(subscriber ->
new Thread(() -> {
subscriber.next(value);
subscriber.complete();
}).start()
);
}
}
<p>
public static void main(String[] args) throws Throwable {
List<String> results = ExecHarness.yieldSingle(execution ->
ReactorRatpack.promise(new AsyncService().flux("foo"))
).getValue();
<p>
assertEquals(Arrays.asList("foo"), results);
}
}
This method uses Flux.collectList()
to collect the observable's contents into a list.
It therefore should not be used with observables with many or infinite items.
If it is expected that the observable only emits one element, it is typically more convenient to use promiseSingle(Mono)
.
If the observable emits an error, the returned promise will fail with that error.
This method must be called during an execution.
T
- the type of the value observedflux
- the fluxUnmanagedThreadException
- if called outside of an executionpromiseSingle(Mono)
public static <T> Promise<T> promiseSingle(Mono<T> mono) throws UnmanagedThreadException
Mono
into a Promise
, for the mono's single item.
This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution.
import ratpack.reactor.ReactorRatpack;
import ratpack.test.exec.ExecHarness;
import reactor.core.publisher.Mono;
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static class AsyncService {
public <T> Observable<T> observe(final T value) {
return Mono.create(subscriber ->
new Thread(() -> {
subscriber.success(value);
subscriber.onCompleted();
}).start()
);
}
}
<p>
public static void main(String[] args) throws Throwable {
String result = ExecHarness.yieldSingle(execution ->
ReactorRatpack.promiseSingle(new AsyncService().observe("foo"))
).getValue();
<p>
assertEquals("foo", result);
}
}
import ratpack.reactor.ReactorRatpack;
import ratpack.test.exec.ExecHarness;
import reactor.core.publisher.Flux;
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static void main(String[] args) throws Throwable {
String result = ExecHarness.yieldSingle(execution ->
ReactorRatpack.promiseSingle(Mono.<String>just("foo"))
).getValue();
assertEquals("foo", result);
}
}
If it is expected that the observable may emit more than one element, use promise(Flux)
.
If the observable emits an error, the returned promise will fail with that error.
If the observable emits no items, the returned promise will fail with a NoSuchElementException
.
If the observable emits more than one item, the returned promise will fail with an IllegalStateException
.
This method must be called during an execution.
T
- the type of the value observedmono
- the monoUnmanagedThreadException
promise(Flux)
public static <T> TransformablePublisher<T> publisher(Flux<T> flux)
Flux
into a Publisher
, for all of the observable's items.
This method can be used to simply adapt an observable to a ReactiveStreams publisher.
import ratpack.reactor.ReactorRatpack;
import ratpack.stream.Streams;
import ratpack.test.exec.ExecHarness;
import reactor.core.publisher.Flux;
import java.util.List;
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static class AsyncService {
public <T> Observable<T> flux(final T value) {
return Flux.create(subscriber ->
new Thread(() -> {
subscriber.next(value);
subscriber.complete();
}).start()
);
}
}
<p>
public static void main(String[] args) throws Throwable {
List<String> result = ExecHarness.yieldSingle(execution ->
ReactorRatpack.publisher(new AsyncService().flux("foo")).toList()
).getValue();
assertEquals("foo", result.get(0));
}
}
T
- the type of the value observedflux
- the fluxpublic static <T> Flux<T> bindExec(Flux<T> source)
This method is useful when you want to consume an asynchronous flux within a Ratpack execution, as a flux.
It is just a combination of promise(Flux)
and fluxEach(Promise)
.
import reactor.core.publisher.Flux;
import ratpack.test.exec.ExecHarness;
import ratpack.reactor.ReactorRatpack;
import java.util.Arrays;
import java.util.List;
import public static org.junit.Assert.*;
<p>
public class Example {
public static void main(String... args) throws Exception {
Flux<String> asyncFlux = Flux.create(subscriber ->
new Thread(() -> {
subscriber.next("foo");
subscriber.next("bar");
subscriber.complete();
}).start()
);
<p>
List<String> strings = ExecHarness.yieldSingle(e ->
ReactorRatpack.promise(asyncFlux.compose(ReactorRatpack::bindExec))
).getValue();
<p>
assertEquals(Arrays.asList("foo", "bar"), strings);
}
}
T
- the type of item observedsource
- the observable sourcefluxEach(Promise)
,
promise(Flux)
public static <T> Flux<T> fork(Flux<T> observable)
This method can be used for simple parallel processing. It's behavior is similar to the
subscribeOn but allows the use of
Ratpack compute threads. Using fork
modifies the execution of the upstream observable.
This is different than forkEach
which modifies where the downstream is executed.
import ratpack.func.Pair;
import ratpack.reactor.ReactorRatpack;
import ratpack.test.exec.ExecHarness;
<p>
import reactor.core.publisher.Flux;
<p>
import public static org.junit.Assert.assertEquals;
import public static org.junit.Assert.assertNotEquals;
<p>
public class Example {
public static void main(String[] args) throws Exception {
ReactorRatpack.initialize();
<p>
try (ExecHarness execHarness = ExecHarness.harness(6)) {
Integer sum = execHarness.yield(execution -> {
final String originalComputeThread = Thread.currentThread().getName();
<p>
Flux<Integer> unforkedFlux = Flux.just(1);
<p>
// `map` is executed upstream from the fork; that puts it on another parallel compute thread
Flux<Pair<Integer, String>> forkedFlux = Flux.just(2)
.map((val) -> Pair.of(val, Thread.currentThread().getName()))
.compose(ReactorRatpack::fork);
<p>
return ReactorRatpack.promiseSingle(
Flux.zip(unforkedFlux, forkedFlux, (Integer intVal, Pair<Integer, String> pair) -> {
String forkedComputeThread = pair.right;
assertNotEquals(originalComputeThread, forkedComputeThread);
return intVal + pair.left;
})
);
}).getValueOrThrow();
<p>
assertEquals(sum.intValue(), 3);
}
}
}
T
- the element typeobservable
- the observable sequence to execute on a different compute threadfork
was called fromforkEach(Flux)
public static <T> Flux<T> fork(Flux<T> flux, Action<? super RegistrySpec> doWithRegistrySpec) throws java.lang.Exception
fork(reactor.core.publisher.Flux<T>)
that allows access to the registry of the forked execution inside an Action
.
This allows the insertion of objects via RegistrySpec.add(java.lang.Class<O>, O)
that will be available to the forked flux.
You do not have access to the original execution inside the Action
.
import ratpack.exec.Execution;
import ratpack.registry.RegistrySpec;
import ratpack.reactor.ReactorRatpack;
import ratpack.test.exec.ExecHarness;
<p>
import reactor.core.publisher.Flux;
<p>
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static void main(String[] args) throws Exception {
ReactorRatpack.initialize();
<p>
try (ExecHarness execHarness = ExecHarness.harness(6)) {
String concatenatedResult = execHarness.yield(execution -> {
<p>
Observable<String> notYetForked = Observable.just("foo")
.map((value) -> value + Execution.current().get(String.class));
<p>
Observable<String> forkedObservable = ReactorRatpack.fork(
notYetForked,
(RegistrySpec registrySpec) -> registrySpec.add("bar")
);
<p>
return ReactorRatpack.promiseSingle(forkedObservable);
}).getValueOrThrow();
<p>
assertEquals(concatenatedResult, "foobar");
}
}
}
T
- the element typeflux
- the flux sequence to execute on a different compute threaddoWithRegistrySpec
- an Action where objects can be inserted into the registry of the forked executionfork
was called fromjava.lang.Exception
fork(Flux)
public static <T> Flux<T> forkEach(Flux<T> flux)
import ratpack.reactor.ReactorRatpack;
import ratpack.util.Exceptions;
import ratpack.test.exec.ExecHarness;
<p>
import reactor.core.publisher.Flux;
<p>
import java.util.List;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
<p>
import public static org.junit.Assert.assertEquals;
<p>
public class Example {
public static void main(String[] args) throws Exception {
ReactorRatpack.initialize();
<p>
CyclicBarrier barrier = new CyclicBarrier(5);
<p>
try (ExecHarness execHarness = ExecHarness.harness(6)) {
List<Integer> values = execHarness.yield(execution ->
ReactorRatpack.promise(
Flux.just(1, 2, 3, 4, 5)
.compose(ReactorRatpack::forkEach) // parallelize
.doOnNext(value -> Exceptions.uncheck(() -> barrier.await())) // wait for all values
.map(integer -> integer.intValue() * 2)
.serialize()
)
).getValue();
<p>
List<Integer> sortedValues = new LinkedList<>(values);
Collections.sort(sortedValues);
assertEquals(Arrays.asList(2, 4, 6, 8, 10), sortedValues);
}
}
}
T
- the element typeflux
- the observable sequence to process each element of in a forked executionpublic static <T> Flux<T> forkEach(Flux<T> flux, Action<? super RegistrySpec> doWithRegistrySpec)
forkEach(reactor.core.publisher.Flux<T>)
that allows access to the registry of each forked execution inside an Action
.
This allows the insertion of objects via RegistrySpec.add(java.lang.Class<O>, O)
that will be available to every forked flux.
You do not have access to the original execution inside the Action
.
T
- the element typeflux
- the flux sequence to process each element of in a forked executiondoWithRegistrySpec
- an Action where objects can be inserted into the registry of the forked executionforkEach(Flux)
,
fork(Flux, Action)
public static Scheduler computationScheduler(ExecController execController)
Execution
(via ExecController.fork()
).execController
- the execution controller to back the schedulerpublic static Scheduler computationScheduler()
Execution
(via ExecController.fork()
).public static Scheduler ioScheduler(ExecController execController)
execController
- the execution controller to back the schedulerpublic static Scheduler ioScheduler()