public abstract class RxRatpack extends Object
IMPORTANT: the initialize()
method must be called to fully enable integration.
Constructor and Description |
---|
RxRatpack() |
Modifier and Type | Method and Description |
---|---|
static <T> Observable<T> |
forkAndJoin(ExecControl execControl,
Observable<T> source)
Forks the current execution in order to subscribe to the given source, then joining the original execution with the source values.
|
static <T> Observable.Operator<T,T> |
forkOnNext(ExecControl execControl)
An operator to parallelize an observable stream by forking a new execution for each omitted item.
|
static <T> Observable<T> |
forkOnNext(ExecControl execControl,
Observable<T> observable)
Alternative method for forking the execution to process each observable element.
|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler. |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a Ratpack promise into an Rx observable.
|
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a Ratpack promise of an iterable value into an Rx observable for each element of the promised iterable.
|
static Scheduler |
scheduler(ExecController execController)
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecControl.fork(ratpack.func.Action) ). |
static <T> Subscriber<? super T> |
subscriber(Fulfiller<T> fulfiller) |
public static void initialize()
RxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.launch.HandlerFactory; import ratpack.launch.LaunchConfig; import ratpack.handling.Handler; import ratpack.handling.Handlers; import ratpack.handling.Context; import ratpack.handling.ChainAction; import ratpack.error.ServerErrorHandler; import ratpack.registry.RegistrySpecAction; import ratpack.rx.RxRatpack; import rx.Observable; import rx.functions.Action1; public class Example { public static class MyHandlerFactory implements HandlerFactory { public Handler create(LaunchConfig launchConfig) throws Exception { // Enable Rx integration RxRatpack.initialize(); return Handlers.chain(launchConfig, new ChainAction() { public void execute() throws Exception { register(new RegistrySpecAction() { // register a custom error handler public void execute() { add(ServerErrorHandler.class, new ServerErrorHandler() { public void error(Context context, Throwable throwable) { context.render("caught by error handler!"); } }); } }); get(new Handler() { public void handle(Context context) { // An observable sequence with no defined error handler // The error will be propagated to context error handler implicitly Observable.<String>error(new Exception("!")).subscribe(new Action1<String>() { public void call(String str) { // will never be called } }); } }); } }); } } }
For a Groovy DSL application, it can be registered during the module bindings.
import ratpack.handling.Context import ratpack.error.ServerErrorHandler import ratpack.rx.RxRatpack import rx.Observable import static ratpack.groovy.test.embed.EmbeddedApplications.embeddedApp import static ratpack.test.http.TestHttpClients.testHttpClient class CustomErrorHandler implements ServerErrorHandler { void error(Context context, Throwable throwable) { context.render("caught by error handler!") } } def app = embeddedApp { bindings { // Enable Rx integration RxRatpack.initialize() bind ServerErrorHandler, CustomErrorHandler } handlers { get { // An observable sequence with no defined error handler // The error will be propagated to context error handler implicitly Observable.error(new Exception("!")).subscribe { // will never happen } } } } def client = testHttpClient(app) try { client.getText() == "caught by error handler!" } finally { app.close() }
public static Scheduler scheduler(ExecController execController)
Execution
(via ExecControl.fork(ratpack.func.Action)
).execController
- the execution controller to back the schedulerpublic static <T> Observable<T> forkAndJoin(ExecControl execControl, Observable<T> source)
This method supports parallelism in the observable stream.
This method uses Observable.toList()
on the given source to collect all values before returning control to the original execution.
As such, source
should not be an infinite or extremely large stream.
T
- the type of item observedexecControl
- the execution controlsource
- the observable sourcepublic static <T> Observable<T> observe(Promise<T> promise)
For example, this can be used to observe blocking operations.
In Java…
import ratpack.handling.Handler; import ratpack.handling.Context; import ratpack.exec.Promise; import java.util.concurrent.Callable; import rx.functions.Func1; import rx.functions.Action1; import static ratpack.rx.RxRatpack.observe; public class ReactiveHandler implements Handler { public void handle(Context context) { Promise<String> promise = context.blocking(new Callable<String>() { public String call() { // do some blocking IO here return "hello world"; } }); observe(promise).map(new Func1<String, String>() { public String call(String input) { return input.toUpperCase(); } }).subscribe(new Action1<String>() { public void call(String str) { context.render(str); // renders: HELLO WORLD } }); } }
A similar example in the Groovy DSL would look like:
import static ratpack.rx.RxRatpack.observe handler { observe(blocking { // do some blocking IO "hello world" }) map { it.toUpperCase() } subscribe { render it // renders: HELLO WORLD } }
T
- the type of value promisedpromise
- the promisepublic static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
The promised iterable will be emitted to the observer one element at a time.
For example, this can be used to observe background operations that produce some kind of iterable…
import static ratpack.rx.RxRatpack.observeEach handler { observeEach(blocking { // do some blocking IO and return a List<String> // each item in the List is emitted to the next Observable, not the List ["a", "b", "c"] }) map { String input -> input.toUpperCase() } subscribe { println it } }The output would be:
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseobserve(ratpack.exec.Promise)
public static <T> Observable<T> forkOnNext(ExecControl execControl, Observable<T> observable)
This method is alternative to forkOnNext(ratpack.exec.ExecControl)
and is functionally equivalent.
T
- the element typeexecControl
- the execution control to use to fork executionsobservable
- the observable sequence to process each element of in a forked executionpublic static <T> Observable.Operator<T,T> forkOnNext(ExecControl execControl)
To be used with the Observable.lift(rx.Observable.Operator)
method.
The onCompleted()
or onError()
downstream methods are guaranteed to be called after the last item has been given to the downstream onNext()
method.
That is, the last invocation of the downstream onNext()
will have returned before onCompleted()
or onError()
are invoked.
This is generally a more performant alternative to using Observable.parallel(rx.functions.Func1<rx.Observable<T>, rx.Observable<R>>)
due to Ratpack's Execution
semantics and use of Netty's event loop to schedule work.
import ratpack.rx.RxRatpack; import ratpack.exec.ExecController; import ratpack.launch.LaunchConfigBuilder; import rx.Observable; import rx.functions.Func1; import rx.functions.Action1; import java.util.List; import java.util.Arrays; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.BrokenBarrierException; public class Example { public static void main(String[] args) throws Exception { RxRatpack.initialize(); final CyclicBarrier barrier = new CyclicBarrier(5); final ExecController execController = LaunchConfigBuilder.noBaseDir().build().getExecController(); Observable<Integer> source = Observable.from(1, 2, 3, 4, 5); List<Integer> doubledAndSorted = source .lift(RxRatpack.<Integer>forkOnNext(execController.getControl())) .map(new Func1<Integer, Integer>() { public Integer call(Integer integer) { try { barrier.await(); // prove stream is processed concurrently } catch (InterruptedException | BrokenBarrierException e) { throw new RuntimeException(e); } return integer.intValue() * 2; } }) .serialize() .toSortedList() .toBlocking() .single(); try { assert doubledAndSorted.equals(Arrays.asList(2, 4, 6, 8, 10)); } finally { execController.close(); } } }
T
- the type of item in the streamexecControl
- the execution control to use to fork executionspublic static <T> Subscriber<? super T> subscriber(Fulfiller<T> fulfiller)