public interface RxBackground
Background
except that an Observable
for the background result is returned.
Use of this class for background operations is superior due to the composable nature of observables.
The RxModule
provides this type.
The observables returned by observe(Callable)
and observeEach(Callable)
are integrated into the standard Ratpack error handling mechanism.
Any unhandled error that occurs will be forwarded to the error handler of the active context at the time the background was entered into.
Modifier and Type | Method and Description |
---|---|
<T> Observable<T> |
observe(Callable<T> callable)
|
<I extends Iterable<T>,T> |
observeEach(Callable<I> callable)
|
<T> Observable<T> observe(Callable<T> callable)
Observable
that will execute the given Callable
when an Observer
subscribes to it.
The Observer's onNext
method will be called exactly once with the result of the Callable.
import ratpack.handling.Handler; import ratpack.handling.Context; import ratpack.rx.RxBackground; import javax.inject.Inject; import java.util.concurrent.Callable; import rx.util.functions.Func1; import rx.util.functions.Action1; public class ReactiveHandler implements Handler { private final RxBackground rxBackground; @Inject public ReactiveHandler(RxBackground rxBackground) { this.rxBackground = rxBackground; } public void handle(Context context) { rxBackground.observe(new Callable<String>() { public String call() { // do some blocking IO here return "hello world"; } }).map(new Func1<String, String>() { public String call(String input) { return input.toUpperCase(); } }).subscribe(new Action1<String>() { public void call(String str) { context.render(str); // renders: HELLO WORLD } }); } }
A similar example in the Groovy DSL would look like:
import ratpack.rx.RxBackground handler { RxBackground rxBackground -> rxBackground.observe { // do some blocking IO "hello world" } map { String input -> input.toUpperCase() } subscribe { render it // renders: HELLO WORLD } }
As with Background.exec(Callable)
, the Callable should do little more than calling a blocking operation
and return the value.
See the section describing error handling on RxModule
T
- The type of value returned by the blocking operationcallable
- The blocking operationObservable
of the blocking operation outcomeobserveEach(Callable)
<I extends Iterable<T>,T> Observable<T> observeEach(Callable<I> callable)
Observable
that will execute the given Callable
when an Observer
subscribes to it.
The Observer's onNext
method will be called for each item in the Callable's Iterable
result.
For example, when a Callable returns a List<String> and a transformation of uppercasing each String in the List is required then RxBackground can be used like this:
import ratpack.rx.RxBackground handler { RxBackground rxBackground -> rxBackground.observeEach { // do some blocking IO and return a List<String> // each item in the List is emitted to the next Observable, not the List ["a", "b", "c"] } map { String input -> input.toUpperCase() } subscribe { println it } }The output would be:
As with Background.exec(Callable)
, the Callable should do little more than calling a blocking operation
and return the value.
See the section describing error handling on RxModule
I
- The Iterable type of the value returned by the blocking operationT
- The type of the Iterable Icallable
- The blocking operationObservable
of the blocking operation outcome