Class RxRatpack
- java.lang.Object
-
- ratpack.rx.RxRatpack
-
@Deprecated public abstract class RxRatpack extends java.lang.Object
Deprecated.since 1.7.0. Useratpack-rx2
instead.Provides integration with RxJava.IMPORTANT: the
initialize()
method must be called to fully enable integration.The methods of this class provide bi-directional conversion between Ratpack's
Promise
and RxJava'sObservable
. This allows Ratpack promise based API to be integrated into an RxJava based app and vice versa.Conveniently, the
initialize()
method installs an RxJava extension that provides a default error handling strategy for observables that integrates with Ratpack's execution model.To test observable based services that use Ratpack's execution semantics, use the
ExecHarness
and convert the observable back to a promise withpromise(Observable)
.The methods in this class are also provided as Groovy Extensions. When using Groovy, each static method in this class is able to act as an instance-level method against the
Observable
type.
-
-
Method Summary
All Methods Static Methods Concrete Methods Deprecated Methods Modifier and Type Method Description static <T> Observable<T>
bindExec(Observable<T> source)
Deprecated.Binds the given observable to the current execution, allowing integration of third-party asynchronous observables with Ratpack's execution model.static <T> Observable<T>
fork(Observable<T> observable)
Deprecated.Parallelize an observable by forking it's execution onto a different Ratpack compute thread and automatically binding the result back to the original execution.static <T> Observable<T>
fork(Observable<T> observable, Action<? super RegistrySpec> doWithRegistrySpec)
Deprecated.A variant offork(rx.Observable<T>)
that allows access to the registry of the forked execution inside anAction
.static <T> Observable<T>
forkEach(Observable<T> observable)
Deprecated.Parallelize an observable by creating a new Ratpack execution for each element.static <T> Observable<T>
forkEach(Observable<T> observable, Action<? super RegistrySpec> doWithRegistrySpec)
Deprecated.A variant offorkEach(rx.Observable<T>)
that allows access to the registry of each forked execution inside anAction
.static void
initialize()
Deprecated.Registers anRxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.static Observable<java.lang.Void>
observe(Operation operation)
Deprecated.Converts aOperation
into anObservable
.static <T> Observable<T>
observe(Promise<T> promise)
Deprecated.Converts aPromise
into anObservable
.static <T,I extends java.lang.Iterable<T>>
Observable<T>observeEach(Promise<I> promise)
Deprecated.Converts aPromise
for an iterable into anObservable
.static <T> Promise<java.util.List<T>>
promise(Observable.OnSubscribe<T> onSubscribe)
Deprecated.Converts anObservable
into aPromise
, for all of the observable's items.static <T> Promise<java.util.List<T>>
promise(Observable<T> observable)
Deprecated.Converts anObservable
into aPromise
, for all of the observable's items.static <T> Promise<T>
promiseSingle(Observable.OnSubscribe<T> onSubscribe)
Deprecated.Converts anObservable
into aPromise
, for the observable's single item.static <T> Promise<T>
promiseSingle(Observable<T> observable)
Deprecated.Converts anObservable
into aPromise
, for the observable's single item.static <T> TransformablePublisher<T>
publisher(Observable.OnSubscribe<T> onSubscribe)
Deprecated.Converts anObservable
into aPublisher
, for all of the observable's items.static <T> TransformablePublisher<T>
publisher(Observable<T> observable)
Deprecated.Converts anObservable
into aPublisher
, for all of the observable's items.static Scheduler
scheduler()
Deprecated.A scheduler that uses the application event loop and initialises each job as anExecution
(viaExecController.fork()
).static Scheduler
scheduler(ExecController execController)
Deprecated.A scheduler that uses the application event loop and initialises each job as anExecution
(viaExecController.fork()
).
-
-
-
Method Detail
-
initialize
public static void initialize()
Deprecated.Registers anRxJavaObservableExecutionHook
with RxJava that provides a default error handling strategy of forwarding exceptions to the execution error handler.This method is idempotent. It only needs to be called once per JVM, regardless of how many Ratpack applications are running within the JVM.
For a Java application, a convenient place to call this is in the handler factory implementation.
import ratpack.error.ServerErrorHandler; import ratpack.test.embed.EmbeddedApp; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String... args) throws Exception { ratpack.rx.RxRatpack.initialize(); // must be called once for the life of the JVM EmbeddedApp.fromHandlers(chain -> chain .register(s -> s .add(ServerErrorHandler.class, (ctx, throwable) -> ctx.render("caught by error handler: " + throwable.getMessage()) ) ) .get(ctx -> Observable.<String>error(new Exception("!")).subscribe(ctx::render)) ).test(httpClient -> assertEquals("caught by error handler: !", httpClient.getText()) ); } }
-
observe
public static <T> Observable<T> observe(Promise<T> promise)
Deprecated.Converts aPromise
into anObservable
.The returned observable emits the promise's single value if it succeeds, and emits the error (i.e. via
onError()
) if it fails.This method works well as a method reference to the
Promise.to(ratpack.func.Function)
method.import ratpack.exec.Promise; import ratpack.test.exec.ExecHarness; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static String value; public static void main(String... args) throws Exception { ExecHarness.runSingle(e -> Promise.value("hello world") .to(ratpack.rx.RxRatpack::observe) .map(String::toUpperCase) .subscribe(s -> value = s) ); assertEquals("HELLO WORLD", value); } }
- Type Parameters:
T
- the type of value promised- Parameters:
promise
- the promise- Returns:
- an observable for the promised value
-
observe
public static Observable<java.lang.Void> observe(Operation operation)
Deprecated.Converts aOperation
into anObservable
.The returned observable emits completes upon completion of the operation without emitting a value, and emits the error (i.e. via
onError()
) if it fails.import ratpack.exec.Operation; import ratpack.test.exec.ExecHarness; import static org.junit.Assert.assertTrue; @SuppressWarnings("deprecation") public class Example { public static boolean executed; public static void main(String... args) throws Exception { ExecHarness.runSingle(e -> Operation.of(() -> executed = true) .to(ratpack.rx.RxRatpack::observe) .subscribe() ); assertTrue(executed); } }
- Parameters:
operation
- the operation- Returns:
- an observable for the operation
-
observeEach
public static <T,I extends java.lang.Iterable<T>> Observable<T> observeEach(Promise<I> promise)
Deprecated.Converts aPromise
for an iterable into anObservable
.The promised iterable will be emitted to the observer one element at a time, like
Observable.from(Iterable)
.import ratpack.exec.Promise; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.LinkedList; import java.util.List; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String... args) throws Exception { final List<String> items = new LinkedList<>(); ExecHarness.runSingle(e -> Promise.value(Arrays.asList("foo", "bar")) .to(ratpack.rx.RxRatpack::observeEach) .subscribe(items::add) ); assertEquals(Arrays.asList("foo", "bar"), items); } }
- Type Parameters:
T
- the element type of the promised iterableI
- the type of iterable- Parameters:
promise
- the promise- Returns:
- an observable for each element of the promised iterable
- See Also:
observe(ratpack.exec.Promise)
-
promise
public static <T> Promise<java.util.List<T>> promise(Observable<T> observable) throws UnmanagedThreadException
Deprecated.Converts anObservable
into aPromise
, for all of the observable's items.This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution. It is sometimes more convenient to use
promise(Observable.OnSubscribe)
over this method.import ratpack.test.exec.ExecHarness; import rx.Observable; import java.util.List; import java.util.Arrays; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { List<String> results = ExecHarness.yieldSingle(execution -> ratpack.rx.RxRatpack.promise(new AsyncService().observe("foo")) ).getValue(); assertEquals(Arrays.asList("foo"), results); } }
This method uses
Observable.toList()
to collect the observable's contents into a list. It therefore should not be used with observables with many or infinite items.If it is expected that the observable only emits one element, it is typically more convenient to use
promiseSingle(rx.Observable)
.If the observable emits an error, the returned promise will fail with that error.
This method must be called during an execution.
- Type Parameters:
T
- the type of the value observed- Parameters:
observable
- the observable- Returns:
- a promise that returns all values from the observable
- Throws:
UnmanagedThreadException
- if called outside of an execution- See Also:
promiseSingle(Observable)
-
promise
public static <T> Promise<java.util.List<T>> promise(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException
Deprecated.Converts anObservable
into aPromise
, for all of the observable's items.This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution. It is intended to be used in conjunction with the
Observable.extend(rx.functions.Func1<? super rx.Observable.OnSubscribe<T>, ? extends R>)
method as a method reference.import ratpack.test.exec.ExecHarness; import rx.Observable; import java.util.List; import java.util.Arrays; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { List<String> results = ExecHarness.yieldSingle(execution -> new AsyncService().observe("foo").extend(ratpack.rx.RxRatpack::promise) ).getValue(); assertEquals(Arrays.asList("foo"), results); } }
This method uses
Observable.toList()
to collect the observable's contents into a list. It therefore should not be used with observables with many or infinite items.If it is expected that the observable only emits one element, it is typically more convenient to use
promiseSingle(rx.Observable)
.If the observable emits an error, the returned promise will fail with that error.
This method must be called during an execution.
- Type Parameters:
T
- the type of the value observed- Parameters:
onSubscribe
- the on subscribe function- Returns:
- a promise that returns all values from the observable
- Throws:
UnmanagedThreadException
- if called outside of an execution- See Also:
promiseSingle(Observable)
,promise(Observable)
-
promiseSingle
public static <T> Promise<T> promiseSingle(Observable<T> observable) throws UnmanagedThreadException
Deprecated.Converts anObservable
into aPromise
, for the observable's single item.This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution. It is sometimes more convenient to use
promiseSingle(Observable.OnSubscribe)
over this method.import ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { String result = ExecHarness.yieldSingle(execution -> ratpack.rx.RxRatpack.promiseSingle(new AsyncService().observe("foo")) ).getValue(); assertEquals("foo", result); } }
This method uses
Observable.single()
to enforce that the observable only emits one item. If the observable may be empty, then useObservable.singleOrDefault(Object)
to provide a default value.import ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String[] args) throws Throwable { String result = ExecHarness.yieldSingle(execution -> ratpack.rx.RxRatpack.promiseSingle(Observable.<String>empty().singleOrDefault("foo")) ).getValue(); assertEquals("foo", result); } }
If it is expected that the observable may emit more than one element, use
promise(rx.Observable)
.If the observable emits an error, the returned promise will fail with that error. If the observable emits no items, the returned promise will fail with a
NoSuchElementException
. If the observable emits more than one item, the returned promise will fail with anIllegalStateException
.This method must be called during an execution.
- Type Parameters:
T
- the type of the value observed- Parameters:
observable
- the observable- Returns:
- a promise that returns the sole value from the observable
- Throws:
UnmanagedThreadException
- See Also:
promise(Observable)
,promiseSingle(Observable.OnSubscribe)
-
promiseSingle
public static <T> Promise<T> promiseSingle(Observable.OnSubscribe<T> onSubscribe) throws UnmanagedThreadException
Deprecated.Converts anObservable
into aPromise
, for the observable's single item.This method can be used to simply adapt an observable to a promise, but can also be used to bind an observable to the current execution. It is intended to be used in conjunction with the
Observable.extend(rx.functions.Func1<? super rx.Observable.OnSubscribe<T>, ? extends R>)
method as a method reference.import ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { String result = ExecHarness.yieldSingle(execution -> new AsyncService().observe("foo").extend(ratpack.rx.RxRatpack::promiseSingle) ).getValue(); assertEquals("foo", result); } }
This method uses
Observable.single()
to enforce that the observable only emits one item. If the observable may be empty, then useObservable.singleOrDefault(Object)
to provide a default value.
If it is expected that the observable may emit more than one element, useimport ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String[] args) throws Throwable { String result = ExecHarness.yieldSingle(execution -> Observable.<String>empty().singleOrDefault("foo").extend(ratpack.rx.RxRatpack::promiseSingle) ).getValue(); assertEquals("foo", result); } }
promise(rx.Observable.OnSubscribe)
.If the observable emits an error, the returned promise will fail with that error. If the observable emits no items, the returned promise will fail with a
NoSuchElementException
. If the observable emits more than one item, the returned promise will fail with anIllegalStateException
.This method must be called during an execution.
- Type Parameters:
T
- the type of the value observed- Parameters:
onSubscribe
- the on subscribe function- Returns:
- a promise that returns the sole value from the observable
- Throws:
UnmanagedThreadException
- See Also:
promise(Observable.OnSubscribe)
,promiseSingle(Observable)
-
publisher
public static <T> TransformablePublisher<T> publisher(Observable<T> observable)
Deprecated.Converts anObservable
into aPublisher
, for all of the observable's items.This method can be used to simply adapt an observable to a ReactiveStreams publisher. It is sometimes more convenient to use
publisher(Observable.OnSubscribe)
over this method.import ratpack.stream.Streams; import ratpack.test.exec.ExecHarness; import rx.Observable; import java.util.List; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { List<String> result = ExecHarness.yieldSingle(execution -> ratpack.rx.RxRatpack.publisher(new AsyncService().observe("foo")).toList() ).getValue(); assertEquals("foo", result.get(0)); } }
- Type Parameters:
T
- the type of the value observed- Parameters:
observable
- the observable- Returns:
- a ReactiveStreams publisher containing each value of the observable
-
publisher
public static <T> TransformablePublisher<T> publisher(Observable.OnSubscribe<T> onSubscribe)
Deprecated.Converts anObservable
into aPublisher
, for all of the observable's items.This method can be used to simply adapt an observable to a ReactiveStreams publisher. It is intended to be used in conjunction with the
Observable.extend(rx.functions.Func1<? super rx.Observable.OnSubscribe<T>, ? extends R>)
method as a method reference.import ratpack.stream.Streams; import ratpack.test.exec.ExecHarness; import rx.Observable; import java.util.List; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { static class AsyncService { public <T> Observable<T> observe(final T value) { return Observable.create(subscriber -> new Thread(() -> { subscriber.onNext(value); subscriber.onCompleted(); }).start() ); } } public static void main(String[] args) throws Throwable { List<String> result = ExecHarness.yieldSingle(execution -> new AsyncService().observe("foo").extend(ratpack.rx.RxRatpack::publisher).toList() ).getValue(); assertEquals("foo", result.get(0)); } }
- Type Parameters:
T
- the type of the value observed- Parameters:
onSubscribe
- the on subscribe function- Returns:
- a ReactiveStreams publisher containing each value of the observable
-
bindExec
public static <T> Observable<T> bindExec(Observable<T> source)
Deprecated.Binds the given observable to the current execution, allowing integration of third-party asynchronous observables with Ratpack's execution model.This method is useful when you want to consume an asynchronous observable within a Ratpack execution, as an observable. It is just a combination of
promise(Observable)
andobserveEach(Promise)
.import rx.Observable; import ratpack.test.exec.ExecHarness; import java.util.Arrays; import java.util.List; import static org.junit.Assert.*; @SuppressWarnings("deprecation") public class Example { public static void main(String... args) throws Exception { Observable<String> asyncObservable = Observable.create(subscriber -> new Thread(() -> { subscriber.onNext("foo"); subscriber.onNext("bar"); subscriber.onCompleted(); }).start() ); List<String> strings = ExecHarness.yieldSingle(e -> ratpack.rx.RxRatpack.promise(asyncObservable.compose(ratpack.rx.RxRatpack::bindExec)) ).getValue(); assertEquals(Arrays.asList("foo", "bar"), strings); } }
- Type Parameters:
T
- the type of item observed- Parameters:
source
- the observable source- Returns:
- an observable stream equivalent to the given source
- See Also:
observeEach(Promise)
,promise(Observable)
-
fork
public static <T> Observable<T> fork(Observable<T> observable)
Deprecated.Parallelize an observable by forking it's execution onto a different Ratpack compute thread and automatically binding the result back to the original execution.This method can be used for simple parallel processing. It's behavior is similar to the subscribeOn but allows the use of Ratpack compute threads. Using
fork
modifies the execution of the upstream observable.This is different than
forkEach
which modifies where the downstream is executed.import ratpack.func.Pair; import ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String[] args) throws Exception { ratpack.rx.RxRatpack.initialize(); try (ExecHarness execHarness = ExecHarness.harness(6)) { Integer sum = execHarness.yield(execution -> { final String originalComputeThread = Thread.currentThread().getName(); Observable<Integer> unforkedObservable = Observable.just(1); // `map` is executed upstream from the fork; that puts it on another parallel compute thread Observable<Pair<Integer, String>> forkedObservable = Observable.just(2) .map((val) -> Pair.of(val, Thread.currentThread().getName())) .compose(ratpack.rx.RxRatpack::fork); return ratpack.rx.RxRatpack.promiseSingle( Observable.zip(unforkedObservable, forkedObservable, (Integer intVal, Pair<Integer, String> pair) -> { String forkedComputeThread = pair.right; assertNotEquals(originalComputeThread, forkedComputeThread); return intVal + pair.left; }) ); }).getValueOrThrow(); assertEquals(sum.intValue(), 3); } } }
- Type Parameters:
T
- the element type- Parameters:
observable
- the observable sequence to execute on a different compute thread- Returns:
- an observable on the compute thread that
fork
was called from - Since:
- 1.4
- See Also:
forkEach(Observable)
-
fork
public static <T> Observable<T> fork(Observable<T> observable, Action<? super RegistrySpec> doWithRegistrySpec) throws java.lang.Exception
Deprecated.A variant offork(rx.Observable<T>)
that allows access to the registry of the forked execution inside anAction
.This allows the insertion of objects via
RegistrySpec.add(java.lang.Class<O>, O)
that will be available to the forked observable.You do not have access to the original execution inside the
Action
.import ratpack.exec.Execution; import ratpack.registry.RegistrySpec; import ratpack.test.exec.ExecHarness; import rx.Observable; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String[] args) throws Exception { ratpack.rx.RxRatpack.initialize(); try (ExecHarness execHarness = ExecHarness.harness(6)) { String concatenatedResult = execHarness.yield(execution -> { Observable<String> notYetForked = Observable.just("foo") .map((value) -> value + Execution.current().get(String.class)); Observable<String> forkedObservable = ratpack.rx.RxRatpack.fork( notYetForked, (RegistrySpec registrySpec) -> registrySpec.add("bar") ); return ratpack.rx.RxRatpack.promiseSingle(forkedObservable); }).getValueOrThrow(); assertEquals(concatenatedResult, "foobar"); } } }
- Type Parameters:
T
- the element type- Parameters:
observable
- the observable sequence to execute on a different compute threaddoWithRegistrySpec
- an Action where objects can be inserted into the registry of the forked execution- Returns:
- an observable on the compute thread that
fork
was called from - Throws:
java.lang.Exception
- Since:
- 1.4
- See Also:
fork(Observable)
-
forkEach
public static <T> Observable<T> forkEach(Observable<T> observable)
Deprecated.Parallelize an observable by creating a new Ratpack execution for each element.import ratpack.util.Exceptions; import ratpack.test.exec.ExecHarness; import rx.Observable; import java.util.List; import java.util.Arrays; import java.util.LinkedList; import java.util.Collection; import java.util.Collections; import java.util.concurrent.CyclicBarrier; import static org.junit.Assert.assertEquals; @SuppressWarnings("deprecation") public class Example { public static void main(String[] args) throws Exception { ratpack.rx.RxRatpack.initialize(); CyclicBarrier barrier = new CyclicBarrier(5); try (ExecHarness execHarness = ExecHarness.harness(6)) { List<Integer> values = execHarness.yield(execution -> ratpack.rx.RxRatpack.promise( Observable.just(1, 2, 3, 4, 5) .compose(ratpack.rx.RxRatpack::forkEach) // parallelize .doOnNext(value -> Exceptions.uncheck(() -> barrier.await())) // wait for all values .map(integer -> integer.intValue() * 2) .serialize() ) ).getValue(); List<Integer> sortedValues = new LinkedList<>(values); Collections.sort(sortedValues); assertEquals(Arrays.asList(2, 4, 6, 8, 10), sortedValues); } } }
- Type Parameters:
T
- the element type- Parameters:
observable
- the observable sequence to process each element of in a forked execution- Returns:
- an observable
-
forkEach
public static <T> Observable<T> forkEach(Observable<T> observable, Action<? super RegistrySpec> doWithRegistrySpec)
Deprecated.A variant offorkEach(rx.Observable<T>)
that allows access to the registry of each forked execution inside anAction
.This allows the insertion of objects via
RegistrySpec.add(java.lang.Class<O>, O)
that will be available to every forked observable.You do not have access to the original execution inside the
Action
.- Type Parameters:
T
- the element type- Parameters:
observable
- the observable sequence to process each element of in a forked executiondoWithRegistrySpec
- an Action where objects can be inserted into the registry of the forked execution- Returns:
- an observable
- Since:
- 1.4
- See Also:
forkEach(Observable)
,fork(Observable, Action)
-
scheduler
public static Scheduler scheduler(ExecController execController)
Deprecated.A scheduler that uses the application event loop and initialises each job as anExecution
(viaExecController.fork()
).- Parameters:
execController
- the execution controller to back the scheduler- Returns:
- a scheduler
-
scheduler
public static Scheduler scheduler()
Deprecated.A scheduler that uses the application event loop and initialises each job as anExecution
(viaExecController.fork()
).That same as
scheduler(ExecController)
, but obtains the exec controller viaExecController.require()
.- Returns:
- a scheduler
-
-