public abstract class RxRatpack extends Object
IMPORTANT: the initialize()
method must be called to fully enable integration.
Constructor and Description |
---|
RxRatpack() |
Modifier and Type | Method and Description |
---|---|
static void |
initialize()
Registers an
RxJavaObservableExecutionHook with RxJava that provides a default error handling strategy of forwarding exceptions to the exec context . |
static <T> Observable<T> |
observe(Promise<T> promise)
Converts a Ratpack promise into an Rx observable.
|
static <T,I extends Iterable<T>> |
observeEach(Promise<I> promise)
Converts a Ratpack promise of an iterable value into an Rx observable for each element of the promised iterable.
|
public static void initialize()
RxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the exec context
.
This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.launch.HandlerFactory; import ratpack.launch.LaunchConfig; import ratpack.handling.Handler; import ratpack.handling.Handlers; import ratpack.handling.Context; import ratpack.handling.ChainAction; import ratpack.error.ServerErrorHandler; import ratpack.registry.RegistrySpecAction; import ratpack.rx.RxRatpack; import rx.Observable; import rx.functions.Action1; public class MyHandlerFactory implements HandlerFactory { public Handler create(LaunchConfig launchConfig) { // Enable Rx integration RxRatpack.initialize(); return Handlers.chain(launchConfig, new ChainAction() { public void execute() { register(new RegistrySpecAction() { // register a custom error handler public void execute() { add(ServerErrorHandler, new ServerErrorHandler() { public void error(Context context, Exception exception) { context.render("caught by error handler!"); } }) } }); get(new Handler() { public void handle(Context context) { // An observable sequence with no defined error handler // The error will be propagated to context error handler implicitly Observable.<String>error(new Exception("!")).subscribe(new Action1<String>() { public void call(String str) { // will never be called } }); } }); } }); } } // Test (Groovy) … import ratpack.test.embed.LaunchConfigEmbeddedApplication import ratpack.launch.LaunchConfigBuilder import static ratpack.groovy.test.TestHttpClients.testHttpClient def app = new LaunchConfigEmbeddedApplication() { protected LaunchConfig createLaunchConfig() { LaunchConfigBuilder.noBaseDir().build(new MyHandlerFactory()); } } def client = testHttpClient(app) try { client.getText() == "caught by error handler!" } finally { app.close() }
For a Groovy DSL application, it can be registered during the module bindings.
import ratpack.handling.Context import ratpack.error.ServerErrorHandler import ratpack.rx.RxRatpack import rx.Observable import static ratpack.groovy.test.embed.EmbeddedApplications.embeddedApp import static ratpack.groovy.test.TestHttpClients.testHttpClient def app = embeddedApp { modules { // Enable Rx integration RxRatpack.initialize() bind ServerErrorHandler, new ServerErrorHandler() { void error(Context context, Exception exception) { context.render("caught by error handler!") } } } handlers { get { // An observable sequence with no defined error handler // The error will be propagated to context error handler implicitly Observable.error(new Exception("!")).subscribe { // will never happen } } } } def client = testHttpClient(app) try { client.getText() == "caught by error handler!" } finally { app.close() }
public static <T> Observable<T> observe(Promise<T> promise)
For example, this can be used to observe blocking operations.
In Java…
import ratpack.handling.Handler; import ratpack.handling.Context; import ratpack.exec.Promise; import java.util.concurrent.Callable; import rx.functions.Func1; import rx.functions.Action1; import static ratpack.rx.RxRatpack.observe; public class ReactiveHandler implements Handler { public void handle(Context context) { Promise<String> promise = context.blocking(new Callable<String>() { public String call() { // do some blocking IO here return "hello world"; } }); observe(promise).map(new Func1<String, String>() { public String call(String input) { return input.toUpperCase(); } }).subscribe(new Action1<String>() { public void call(String str) { context.render(str); // renders: HELLO WORLD } }); } }
A similar example in the Groovy DSL would look like:
import static ratpack.rx.RxRatpack.observe; handler { observe(blocking { // do some blocking IO "hello world" }) map { String input -> input.toUpperCase() } subscribe { render it // renders: HELLO WORLD } }
T
- the type of value promisedpromise
- the promisepublic static <T,I extends Iterable<T>> Observable<T> observeEach(Promise<I> promise)
The promised iterable will be emitted to the observer one element at a time.
For example, this can be used to observe background operations that produce some kind of iterable…
import static ratpack.rx.RxRatpack.observeEach handler { observeEach(blocking { // do some blocking IO and return a List<String> // each item in the List is emitted to the next Observable, not the List ["a", "b", "c"] }) map { String input -> input.toUpperCase() } subscribe { println it } }The output would be:
T
- the element type of the promised iterableI
- the type of iterablepromise
- the promiseobserve(ratpack.exec.Promise)