public abstract class RxRatpack extends Object
IMPORTANT: the initialize()
method must be called to fully enable integration.
The methods of this class provide bi-directional conversion between Ratpack's Promise
and RxJava's Observable
.
This allows Ratpack promise based API to be integrated into an RxJava based app and vice versa.
Conveniently, the initialize()
method installs an RxJava extension that provides a default error handling strategy for observables that integrates with Ratpack's execution model.
To test observable based services that use Ratpack's execution semantics, use the ExecHarness
and convert the observable back to a promise with promise(Observable)
.
The methods in this class are also provided as Groovy Extensions.
When using Groovy, each static method in this class is able to act as an instance-level method against the Observable
type.
Modifier and Type | Method and Description |
---|---|
static <T> Observable<T> |
bindExec(Observable<T> source)
Binds the given observable to the current execution, allowing integration of third-party asynchronous observables with Ratpack's execution model.
|
static <T> Observable<T> |
forkEach(Observable<T> observable)
Parallelize an observable by creating a new Ratpack execution for each element.
|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler. |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a
Promise into an Observable . |
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a
Promise for an iterable into an Observable . |
static <T> Promise<List<T>> |
promise(Observable<T> observable)
Converts an
Observable into a Promise , for all of the observable's items. |
static <T> Promise<T> |
promiseSingle(Observable<T> observable)
Converts an
Observable into a Promise , for the observable's single item. |
static Scheduler |
scheduler(ExecController execController)
A scheduler that uses the application event loop and initialises each job as an
Execution (via ExecControl.fork() ). |
public static void initialize()
RxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.error.ServerErrorHandler;
import ratpack.rx.RxRatpack;
import ratpack.test.embed.EmbeddedApp;
import rx.Observable;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
RxRatpack.initialize(); // must be called once for the life of the JVM
EmbeddedApp.fromHandlers(chain -> chain
.register(s -> s
.add(ServerErrorHandler.class, (ctx, throwable) ->
ctx.render("caught by error handler: " + throwable.getMessage())
)
)
.get(ctx -> Observable.<String>error(new Exception("!")).subscribe(ctx::render))
).test(httpClient ->
assertEquals("caught by error handler: !", httpClient.getText())
);
}
}
public static <T> Observable<T> observe(Promise<T> promise)
Promise
into an Observable
.
The returned observable emits the promise's single value if it succeeds, and emits the error (i.e. via onError()
) if it fails.
This method works well as a method reference to the Promise.to(ratpack.func.Function)
method.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import static org.junit.Assert.assertEquals;
public class Example {
public static String value;
public static void main(String... args) throws Exception {
ExecHarness.runSingle(e ->
e.promiseOf("hello world")
.to(RxRatpack::observe)
.map(String::toUpperCase)
.subscribe(s -> value = s)
);
assertEquals("HELLO WORLD", value);
}
}
T
- the type of value promisedpromise
- the promisepublic static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
Promise
for an iterable into an Observable
.
The promised iterable will be emitted to the observer one element at a time, like Observable.from(Iterable)
.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String... args) throws Exception {
final List<String> items = new LinkedList<>();
ExecHarness.runSingle(e ->
e.promiseOf(Arrays.asList("foo", "bar"))
.to(RxRatpack::observeEach)
.subscribe(items::add)
);
assertEquals(Arrays.asList("foo", "bar"), items);
}
}
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseobserve(ratpack.exec.Promise)
public static <T> Promise<List<T>> promise(Observable<T> observable) throws UnmanagedThreadException
Observable
into a Promise
, for all of the observable's items.
This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import rx.Observable;
import java.util.List;
import java.util.Arrays;
import static org.junit.Assert.assertEquals;
public class Example {
static class AsyncService {
public <T> Observable<T> observe(final T value) {
return Observable.create(subscriber ->
new Thread(() -> {
subscriber.onNext(value);
subscriber.onCompleted();
}).start()
);
}
}
public static void main(String[] args) throws Throwable {
List<String> results = ExecHarness.yieldSingle(execution ->
RxRatpack.promise(new AsyncService().observe("foo"))
).getValue();
assertEquals(Arrays.asList("foo"), results);
}
}
This method uses Observable.toList()
to collect the observable's contents into a list.
It therefore should not be used with observables with many or infinite items.
If it is expected that the observable only emits one element, it is typically more convenient to use promiseSingle(rx.Observable)
.
If the observable emits an error, the returned promise will fail with that error.
This method must be called during an execution.
T
- the type of the value observedobservable
- the observableUnmanagedThreadException
- if called outside of an executionpromiseSingle(Observable)
public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException
Observable
into a Promise
, for the observable's single item.
This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import rx.Observable;
import static org.junit.Assert.assertEquals;
public class Example {
static class AsyncService {
public <T> Observable<T> observe(final T value) {
return Observable.create(subscriber ->
new Thread(() -> {
subscriber.onNext(value);
subscriber.onCompleted();
}).start()
);
}
}
public static void main(String[] args) throws Throwable {
String result = ExecHarness.yieldSingle(execution ->
RxRatpack.promiseSingle(new AsyncService().observe("foo"))
).getValue();
assertEquals("foo", result);
}
}
This method uses Observable.single()
to enforce that the observable only emits one item.
If the observable may be empty, then use Observable.singleOrDefault(Object)
to provide a default value.
import ratpack.rx.RxRatpack;
import ratpack.test.exec.ExecHarness;
import rx.Observable;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String[] args) throws Throwable {
String result = ExecHarness.yieldSingle(execution ->
RxRatpack.promiseSingle(Observable.<String>empty().singleOrDefault("foo"))
).getValue();
assertEquals("foo", result);
}
}
If it is expected that the observable may emit more than one element, use promise(rx.Observable)
.
If the observable emits an error, the returned promise will fail with that error.
If the observable emits no items, the returned promise will fail with a NoSuchElementException
.
If the observable emits more than one item, the returned promise will fail with an IllegalStateException
.
This method must be called during an execution.
T
- the type of the value observedobservable
- the observableUnmanagedThreadException
promise(Observable)
public static <T> Observable<T> bindExec(Observable<T> source)
This method is useful when you want to consume an asynchronous observable within a Ratpack execution, as an observable.
It is just a combination of promise(Observable)
and observeEach(Promise)
.
import rx.Observable;
import ratpack.test.exec.ExecHarness;
import ratpack.rx.RxRatpack;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.*;
public class Example {
public static void main(String... args) throws Exception {
Observable<String> asyncObservable = Observable.create(subscriber ->
new Thread(() -> {
subscriber.onNext("foo");
subscriber.onNext("bar");
subscriber.onCompleted();
}).start()
);
List<String> strings = ExecHarness.yieldSingle(e ->
RxRatpack.promise(asyncObservable.compose(RxRatpack::bindExec))
).getValue();
assertEquals(Arrays.asList("foo", "bar"), strings);
}
}
T
- the type of item observedsource
- the observable sourceobserveEach(Promise)
,
promise(Observable)
public static <T> Observable<T> forkEach(Observable<T> observable)
import ratpack.rx.RxRatpack;
import ratpack.util.Exceptions;
import ratpack.test.exec.ExecHarness;
import rx.Observable;
import java.util.List;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CyclicBarrier;
import static org.junit.Assert.assertEquals;
public class Example {
public static void main(String[] args) throws Exception {
RxRatpack.initialize();
CyclicBarrier barrier = new CyclicBarrier(5);
try (ExecHarness execHarness = ExecHarness.harness(6)) {
List<Integer> values = execHarness.yield(execution ->
RxRatpack.promise(
Observable.just(1, 2, 3, 4, 5)
.compose(RxRatpack::forkEach) // parallelize
.doOnNext(value -> Exceptions.uncheck(() -> barrier.await())) // wait for all values
.map(integer -> integer.intValue() * 2)
.serialize()
)
).getValue();
List<Integer> sortedValues = new LinkedList<>(values);
Collections.sort(sortedValues);
assertEquals(Arrays.asList(2, 4, 6, 8, 10), sortedValues);
}
}
}
T
- the element typeobservable
- the observable sequence to process each element of in a forked executionpublic static Scheduler scheduler(ExecController execController)
Execution
(via ExecControl.fork()
).execController
- the execution controller to back the scheduler