public abstract class RxRatpack extends Object
IMPORTANT: the initialize()
method must be called to fully enable integration.
The methods of this class provide bi-directional conversion between Ratpack's Promise
and RxJava's Observable
.
This allows Ratpack promise based API to be integrated into an RxJava based app and vice versa.
When using observables for asynchronous actions, it is generally required to wrap promises created by an ExecControl
in order to integrate with Ratpack's execution model.
This typically means using ExecControl.promise(Action)
or ExecControl.blocking(java.util.concurrent.Callable)
to initiate the operation and then wrapping with observe(Promise)
or similar.
To test observable based services that use Ratpack's execution semantics, use the ExecHarness
and convert the observable back to a promise with asPromise(Observable)
.
Constructor and Description |
---|
RxRatpack() |
Modifier and Type | Method and Description |
---|---|
static <T> Promise<List<T>> |
asPromise(Observable<T> observable)
Converts a Rx
Observable into a Ratpack Promise . |
static <T> Promise<T> |
asPromiseSingle(Observable<T> observable)
Convenience for converting an
Observable to a Promise when it is known that the observable sequence is of zero or one elements. |
static <T> Observable<T> |
forkAndJoin(ExecControl execControl,
Observable<T> source)
Forks the current execution in order to subscribe to the given source, then joining the original execution with the source values.
|
static <T> Observable.Operator<T,T> |
forkOnNext(ExecControl execControl)
An operator to parallelize an observable stream by forking a new execution for each omitted item.
|
static <T> Observable<T> |
forkOnNext(ExecControl execControl,
Observable<T> observable)
Alternative method for forking the execution to process each observable element.
|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler. |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a Ratpack promise into an Rx observable.
|
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a Ratpack promise of an iterable value into an Rx observable for each element of the promised iterable.
|
static Scheduler |
scheduler(ExecController execController)
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecControl.exec() ). |
public static void initialize()
RxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.error.ServerErrorHandler;
import ratpack.rx.RxRatpack;
import ratpack.test.UnitTest;
import ratpack.test.handling.HandlingResult;
import rx.Observable;
public class Example {
public static void main(String... args) throws Exception {
RxRatpack.initialize(); // must be called once per JVM
HandlingResult result = UnitTest.requestFixture().handleChain(chain -> {
chain.register(registry ->
registry.add(ServerErrorHandler.class, (context, throwable) ->
context.render("caught by error handler: " + throwable.getMessage())
)
);
chain.get(ctx -> Observable.<String>error(new Exception("!")).subscribe((s) -> {}));
});
assert result.rendered(String.class).equals("caught by error handler: !");
}
}
public static Scheduler scheduler(ExecController execController)
Execution
(via ExecControl.exec()
).execController
- the execution controller to back the schedulerpublic static <T> Observable<T> forkAndJoin(ExecControl execControl, Observable<T> source)
This method supports parallelism in the observable stream.
This method uses Observable.toList()
on the given source to collect all values before returning control to the original execution.
As such, source
should not be an infinite or extremely large stream.
T
- the type of item observedexecControl
- the execution controlsource
- the observable sourcepublic static <T> Observable<T> observe(Promise<T> promise)
For example, this can be used to observe blocking operations.
import ratpack.exec.Promise;
import ratpack.test.handling.HandlingResult;
import static ratpack.rx.RxRatpack.observe;
import static ratpack.test.UnitTest.requestFixture;
public class Example {
public static void main(String... args) throws Exception {
HandlingResult result = requestFixture().handle(context -> {
Promise<String> promise = context.blocking(() -> "hello world");
observe(promise).map(String::toUpperCase).subscribe(context::render);
});
assert result.rendered(String.class).equals("HELLO WORLD");
}
}
T
- the type of value promisedpromise
- the promisepublic static <T> Promise<List<T>> asPromise(Observable<T> observable)
Observable
into a Ratpack Promise
.
For example, this can be used unit test Rx observables.
import ratpack.test.exec.ExecHarness;
import ratpack.test.exec.ExecResult;
import rx.Observable;
import java.util.List;
import static ratpack.rx.RxRatpack.asPromise;
public class Example {
static class AsyncService {
// Our method under test
// Typically this would be returning an Observable of an asynchronously produced value (using RxRatpack.observe()),
// but for this example we are just faking it with a simple literal observable
public <T> Observable<T> observe(final T value) {
return Observable.just(value);
}
}
public static void main(String[] args) throws Throwable {
// set up the code under test that returns observables
final AsyncService service = new AsyncService();
// exercise the async code using the harness, blocking until the promised value is available
ExecResult<List<String>> result = ExecHarness.yieldSingle(execution -> asPromise(service.observe("foo")));
List<String> results = result.getValue();
assert results.size() == 1;
assert results.get(0).equals("foo");
}
}
T
- the type of the value observedobservable
- the observableasPromiseSingle(Observable)
public static <T> Promise<T> asPromiseSingle(Observable<T> observable)
Observable
to a Promise
when it is known that the observable sequence is of zero or one elements.
Has the same behavior as asPromise(Observable)
, except that the list representation of the sequence is “unpacked”.
If the observable sequence produces no elements, the promised value will be null
.
If the observable sequence produces one element, the promised value will be that element.
If the observable sequence produces more than one element, the promised will fail with an IllegalAccessException
.
T
- the type of the value observedobservable
- the observableasPromise(Observable)
public static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
The promised iterable will be emitted to the observer one element at a time. For example, this can be used to observe background operations that produce some kind of iterable…
import ratpack.exec.Promise;
import ratpack.test.handling.HandlingResult;
import java.util.Arrays;
import java.util.List;
import static ratpack.rx.RxRatpack.observeEach;
import static ratpack.test.UnitTest.requestFixture;
public class Example {
public static void main(String... args) throws Exception {
HandlingResult result = requestFixture().handle(context -> {
Promise<List<String>> promise = context.blocking(() -> Arrays.asList("hello", "world"));
observeEach(promise)
.map(String::toUpperCase)
.toList()
.subscribe(strings -> context.render(String.join(" ", strings)));
});
assert result.rendered(String.class).equals("HELLO WORLD");
}
}
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseobserve(ratpack.exec.Promise)
public static <T> Observable<T> forkOnNext(ExecControl execControl, Observable<T> observable)
This method is alternative to forkOnNext(ExecControl)
and is functionally equivalent.
T
- the element typeexecControl
- the execution control to use to fork executionsobservable
- the observable sequence to process each element of in a forked executionpublic static <T> Observable.Operator<T,T> forkOnNext(ExecControl execControl)
To be used with the Observable.lift(Observable.Operator)
method.
The onCompleted()
or onError()
downstream methods are guaranteed to be called after the last item has been given to the downstream onNext()
method.
That is, the last invocation of the downstream onNext()
will have returned before onCompleted()
or onError()
are invoked.
This is generally a more performant alternative to using plain Rx parallelization due to Ratpack's Execution
semantics and use of Netty's event loop to schedule work.
import ratpack.rx.RxRatpack; import ratpack.exec.ExecController; import ratpack.launch.LaunchConfigBuilder; import rx.Observable; import rx.functions.Func1; import rx.functions.Action1; import java.util.List; import java.util.Arrays; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class Example { public static void main(String[] args) throws Exception { RxRatpack.initialize(); final CyclicBarrier barrier = new CyclicBarrier(5); final ExecController execController = LaunchConfigBuilder.noBaseDir().threads(6).build().getExecController(); Integer[] myArray = {1, 2, 3, 4, 5}; Observable<Integer> source = Observable.from(myArray); List<Integer> doubledAndSorted = source .lift(RxRatpack.<Integer>forkOnNext(execController.getControl())) .map(new Func1<Integer, Integer>() { public Integer call(Integer integer) { try { barrier.await(); // prove stream is processed concurrently } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } return integer.intValue() * 2; } }) .serialize() .toSortedList() .toBlocking() .single(); try { assert doubledAndSorted.equals(Arrays.asList(2, 4, 6, 8, 10)); } finally { execController.close(); } } }
T
- the type of item in the streamexecControl
- the execution control to use to fork executions