public abstract class RxRatpack extends Object
IMPORTANT: the initialize()
method must be called to fully enable integration.
Constructor and Description |
---|
RxRatpack() |
Modifier and Type | Method and Description |
---|---|
static <T> Observable<T> |
forkAndJoin(ExecControl execControl,
Observable<T> source)
Forks the current execution in order to subscribe to the given source, then joining the original execution with the source values.
|
static <T> Observable.Operator<T,T> |
forkOnNext(ExecControl execControl)
An operator to parallelize an observable stream by forking a new execution for each omitted item.
|
static <T> Observable<T> |
forkOnNext(ExecControl execControl,
Observable<T> observable)
Alternative method for forking the execution to process each observable element.
|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler. |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a Ratpack promise into an Rx observable.
|
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a Ratpack promise of an iterable value into an Rx observable for each element of the promised iterable.
|
static Scheduler |
scheduler(ExecController execController)
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecControl.fork(ratpack.func.Action) ). |
static <T> Subscriber<? super T> |
subscriber(Fulfiller<T> fulfiller) |
public static void initialize()
RxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.error.ServerErrorHandler;
import ratpack.rx.RxRatpack;
import ratpack.test.UnitTest;
import ratpack.test.handling.HandlingResult;
import rx.Observable;
public class Example {
public static void main(String... args) throws Exception {
RxRatpack.initialize(); // must be called once per JVM
HandlingResult result = UnitTest.requestFixture().handleChain(chain -> {
chain.register(registry ->
registry.add(ServerErrorHandler.class, (context, throwable) ->
context.render("caught by error handler: " + throwable.getMessage())
)
);
chain.get(ctx -> Observable.<String>error(new Exception("!")).subscribe((s) -> {}));
});
assert result.rendered(String.class).equals("caught by error handler: !");
}
}
public static Scheduler scheduler(ExecController execController)
Execution
(via ExecControl.fork(ratpack.func.Action)
).execController
- the execution controller to back the schedulerpublic static <T> Observable<T> forkAndJoin(ExecControl execControl, Observable<T> source)
This method supports parallelism in the observable stream.
This method uses Observable.toList()
on the given source to collect all values before returning control to the original execution.
As such, source
should not be an infinite or extremely large stream.
T
- the type of item observedexecControl
- the execution controlsource
- the observable sourcepublic static <T> Observable<T> observe(Promise<T> promise)
For example, this can be used to observe blocking operations.
import ratpack.exec.Promise;
import ratpack.test.handling.HandlingResult;
import static ratpack.rx.RxRatpack.observe;
import static ratpack.test.UnitTest.requestFixture;
public class Example {
public static void main(String... args) throws Exception {
HandlingResult result = requestFixture().handle(context -> {
Promise<String> promise = context.blocking(() -> "hello world");
observe(promise).map(String::toUpperCase).subscribe(context::render);
});
assert result.rendered(String.class).equals("HELLO WORLD");
}
}
T
- the type of value promisedpromise
- the promisepublic static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
The promised iterable will be emitted to the observer one element at a time. For example, this can be used to observe background operations that produce some kind of iterable…
import ratpack.exec.Promise;
import ratpack.test.handling.HandlingResult;
import java.util.Arrays;
import java.util.List;
import static ratpack.rx.RxRatpack.observeEach;
import static ratpack.test.UnitTest.requestFixture;
public class Example {
public static void main(String... args) throws Exception {
HandlingResult result = requestFixture().handle(context -> {
Promise<List<String>> promise = context.blocking(() -> Arrays.asList("hello", "world"));
observeEach(promise)
.map(String::toUpperCase)
.toList()
.subscribe(strings -> context.render(String.join(" ", strings)));
});
assert result.rendered(String.class).equals("HELLO WORLD");
}
}
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseobserve(ratpack.exec.Promise)
public static <T> Observable<T> forkOnNext(ExecControl execControl, Observable<T> observable)
This method is alternative to forkOnNext(ratpack.exec.ExecControl)
and is functionally equivalent.
T
- the element typeexecControl
- the execution control to use to fork executionsobservable
- the observable sequence to process each element of in a forked executionpublic static <T> Observable.Operator<T,T> forkOnNext(ExecControl execControl)
To be used with the Observable.lift(rx.Observable.Operator)
method.
The onCompleted()
or onError()
downstream methods are guaranteed to be called after the last item has been given to the downstream onNext()
method.
That is, the last invocation of the downstream onNext()
will have returned before onCompleted()
or onError()
are invoked.
This is generally a more performant alternative to using Observable.parallel(rx.functions.Func1<rx.Observable<T>, rx.Observable<R>>)
due to Ratpack's Execution
semantics and use of Netty's event loop to schedule work.
import ratpack.rx.RxRatpack; import ratpack.exec.ExecController; import ratpack.launch.LaunchConfigBuilder; import rx.Observable; import rx.functions.Func1; import rx.functions.Action1; import java.util.List; import java.util.Arrays; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class Example { public static void main(String[] args) throws Exception { RxRatpack.initialize(); final CyclicBarrier barrier = new CyclicBarrier(5); final ExecController execController = LaunchConfigBuilder.noBaseDir().threads(6).build().getExecController(); Integer[] myArray = {1, 2, 3, 4, 5}; Observable<Integer> source = Observable.from(myArray); List<Integer> doubledAndSorted = source .lift(RxRatpack.<Integer>forkOnNext(execController.getControl())) .map(new Func1<Integer, Integer>() { public Integer call(Integer integer) { try { barrier.await(); // prove stream is processed concurrently } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } return integer.intValue() * 2; } }) .serialize() .toSortedList() .toBlocking() .single(); try { assert doubledAndSorted.equals(Arrays.asList(2, 4, 6, 8, 10)); } finally { execController.close(); } } }
T
- the type of item in the streamexecControl
- the execution control to use to fork executionspublic static <T> Subscriber<? super T> subscriber(Fulfiller<T> fulfiller)